diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 4faea6e5e3..520a8e9c2c 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -82,7 +82,7 @@ typedef struct STuplePos { int32_t pageId; int32_t offset; }; - STupleKey streamTupleKey; + SWinKey streamTupleKey; }; } STuplePos; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 5b125b42d4..1cc61ec072 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -86,9 +86,8 @@ typedef struct { int64_t number; } SStreamStateCur; -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ba4ef5a669..8f5d6c38f3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -110,14 +110,14 @@ typedef struct { int64_t ver; int32_t* dataRef; SPackedData submit; -} SStreamDataSubmit2; +} SStreamDataSubmit; typedef struct { int8_t type; int64_t ver; SArray* dataRefs; // SArray SArray* submits; // SArray -} SStreamMergedSubmit2; +} SStreamMergedSubmit; typedef struct { int8_t type; @@ -205,10 +205,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); typedef struct { char* qmsg; diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index d50f0e0a31..7124e2d251 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -38,8 +38,8 @@ typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, - TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, + GetTsFun fp, void* pFile, TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); @@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 68b8dd7201..0713150b48 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2447a23edc..9652473f9d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -46,28 +46,28 @@ typedef struct STqOffsetStore STqOffsetStore; // tqPush -//typedef struct { -// // msg info -// int64_t consumerId; -// int64_t reqOffset; -// int64_t processedVer; -// int32_t epoch; -// // rpc info -// int64_t reqId; -// SRpcHandleInfo rpcInfo; -// tmr_h timerId; -// int8_t tmrStopped; -// // exec -// int8_t inputStatus; -// int8_t execStatus; -// SStreamQueue inputQ; -// SRWLatch lock; -//} STqPushHandle; +// typedef struct { +// // msg info +// int64_t consumerId; +// int64_t reqOffset; +// int64_t processedVer; +// int32_t epoch; +// // rpc info +// int64_t reqId; +// SRpcHandleInfo rpcInfo; +// tmr_h timerId; +// int8_t tmrStopped; +// // exec +// int8_t inputStatus; +// int8_t execStatus; +// SStreamQueue inputQ; +// SRWLatch lock; +// } STqPushHandle; // tqExec typedef struct { - char* qmsg; // SubPlanToString + char* qmsg; // SubPlanToString } STqExecCol; typedef struct { @@ -79,35 +79,35 @@ typedef struct { } STqExecDb; typedef struct { - int8_t subType; - STqReader* pTqReader; - qTaskInfo_t task; + int8_t subType; + STqReader* pTqReader; + qTaskInfo_t task; union { STqExecCol execCol; STqExecTb execTb; STqExecDb execDb; }; - int32_t numOfCols; // number of out pout column, temporarily used + int32_t numOfCols; // number of out pout column, temporarily used } STqExecHandle; -typedef enum tq_handle_status{ +typedef enum tq_handle_status { TMQ_HANDLE_STATUS_IDLE = 0, TMQ_HANDLE_STATUS_EXEC = 1, -}tq_handle_status; +} tq_handle_status; typedef struct { - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - int64_t consumerId; - int32_t epoch; - int8_t fetchMeta; - int64_t snapshotVer; - SWalReader* pWalReader; - SWalRef* pRef; -// STqPushHandle pushHandle; // push - STqExecHandle execHandle; // exec - SRpcMsg* msg; - int32_t noDataPollCnt; - tq_handle_status status; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int64_t consumerId; + int32_t epoch; + int8_t fetchMeta; + int64_t snapshotVer; + SWalReader* pWalReader; + SWalRef* pRef; + // STqPushHandle pushHandle; // push + STqExecHandle execHandle; // exec + SRpcMsg* msg; + int32_t noDataPollCnt; + tq_handle_status status; } STqHandle; struct STQ { @@ -147,8 +147,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, - int32_t vgId); +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, + int32_t type, int32_t vgId); int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); // tqMeta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 75e25b26b1..a8c2a09319 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1296,7 +1296,11 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq); + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + tqStartStreamTasks(pTq); + } else { + streamSchedExec(pTask); + } } return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 53c2da42c6..b756f99f32 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -325,7 +325,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { memcpy(data, pBody, len); SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - *pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT); + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); if (*pItem == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("%s failed to create data submit for stream since out of memory", id); @@ -447,8 +447,8 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, - pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr); + tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, + pReader->nextBlk, numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { @@ -705,8 +705,9 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, const char* id) { SColVal colVal; tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { -// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", -// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); + // tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in + // schema:%d", + // sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4a9e3dcee7..0bd7d9a57b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,12 +17,12 @@ #include "tmsg.h" #include "tq.h" -#define MAX_CATCH_NUM 10240 +#define MAX_CACHE_TABLE_INFO_NUM 10240 -typedef struct STblInfo { +typedef struct STableSinkInfo { uint64_t uid; char tbName[TSDB_TABLE_NAME_LEN]; -} STblInfo; +} STableSinkInfo; int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -97,19 +97,21 @@ end: return ret; } -int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) { - void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t)); +static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { + void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); if (pVal) { - *pTbl = *(STblInfo**)pVal; + *pInfo = *(STableSinkInfo**)pVal; return TSDB_CODE_SUCCESS; } + return TSDB_CODE_FAILED; } -int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) { - if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { - return TSDB_CODE_SUCCESS; +int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { + if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { + return TSDB_CODE_FAILED; } + return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); } @@ -274,7 +276,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -283,35 +285,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.suid = suid; tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; - STblInfo* pTblMeta = NULL; - int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta); + STableSinkInfo* pTableSinkInfo = NULL; + int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); if (res != TSDB_CODE_SUCCESS) { - pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo)); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo)); } char* ctbName = pDataBlock->info.parTbName; if (!ctbName[0]) { if (res == TSDB_CODE_SUCCESS) { - memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName)); + memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); } else { char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); memcpy(ctbName, tmp, strlen(tmp)); - memcpy(pTblMeta->tbName, tmp, strlen(tmp)); + memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp)); taosMemoryFree(tmp); - tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), + tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), pDataBlock->info.id.groupId); } } if (res == TSDB_CODE_SUCCESS) { - tbData.uid = pTblMeta->uid; + tbData.uid = pTableSinkInfo->uid; } else { SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); SVCreateTbReq* pCreateTbReq = NULL; @@ -371,7 +373,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, mr.me.type); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } @@ -380,13 +382,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ", actual suid %" PRId64 "", TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } tbData.uid = mr.me.uid; - pTblMeta->uid = mr.me.uid; - tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta); + pTableSinkInfo->uid = mr.me.uid; + int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + } metaReaderClear(&mr); } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 704f4695fb..b5308d8b98 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -15,10 +15,11 @@ #include "tq.h" -#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) -#define NO_POLL_CNT 5 +#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define NO_POLL_CNT 5 -static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); +static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, + const SMqMetaRsp* pRsp, int32_t vgId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char buf[128] = {0}; @@ -103,7 +104,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[80]; tFormatOffset(formatBuf, 80, pOffsetVal); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64, + tqDebug("tmq poll: consumer:0x%" PRIx64 + ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64, consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); return 0; } else { @@ -152,7 +154,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return code; } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { - tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", + tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 + " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return -1; @@ -196,24 +199,25 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // NOTE: this pHandle->consumerId may have been changed already. code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); - end: - { - char buf[80] = {0}; - tFormatOffset(buf, 80, &dataRsp.rspOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - tDeleteMqDataRsp(&dataRsp); - } +end : { + char buf[80] = {0}; + tFormatOffset(buf, 80, &dataRsp.rspOffset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 + " code:%d", + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); + tDeleteMqDataRsp(&dataRsp); +} return code; } -static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { - int code = 0; - int32_t vgId = TD_VID(pTq->pVnode); - SWalCkHead* pCkHead = NULL; - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; +static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, + SRpcMsg* pMsg, STqOffsetVal* offset) { + int code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + SWalCkHead* pCkHead = NULL; + SMqMetaRsp metaRsp = {0}; + STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); if (offset->type != TMQ_OFFSET__LOG) { @@ -272,12 +276,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId, - pRequest->epoch, vgId, fetchVer, pHead->msgType); + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", + pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { - if(totalRows > 0) { + if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; @@ -301,7 +305,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); if (code < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, + pRequest->subKey); goto end; } @@ -340,7 +345,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ if (blockReturned) { return 0; } - } else { // use the consumer specified offset + } else { // use the consumer specified offset // the offset value can not be monotonious increase?? offset = reqOffset; } @@ -348,7 +353,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); - } else { // todo handle the case where re-balance occurs. + } else { // todo handle the case where re-balance occurs. // for taosx return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } @@ -363,7 +368,8 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int pMsgHead->walever = ever; } -int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) { +int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, + int32_t vgId) { int32_t len = 0; int32_t code = 0; tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code); @@ -387,7 +393,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll tEncodeMqMetaRsp(&encoder, pRsp); tEncoderClear(&encoder); - SRpcMsg resp = { .info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0 }; + SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&resp); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId, @@ -397,7 +403,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll } int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, - int32_t type, int64_t sver, int64_t ever) { + int32_t type, int64_t sver, int64_t ever) { int32_t len = 0; int32_t code = 0; @@ -432,7 +438,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* } tEncoderClear(&encoder); - SRpcMsg rsp = { .info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0 }; + SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&rsp); return 0; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index bac7099f9e..2daeb70260 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -299,7 +299,6 @@ typedef struct SPartitionBySupporter { typedef struct SPartitionDataInfo { uint64_t groupId; char* tbname; - SArray* tags; SArray* rowIds; } SPartitionDataInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 7ad8821ff9..f2eee82579 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { return pBlock; } +void freePartItem(void* ptr) { + SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr; + taosArrayDestroy(pPart->rowIds); +} + SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 652825165c..7a9efde4f3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2674,6 +2674,29 @@ TSKEY compareTs(void* pKey) { return pWinKey->ts; } +int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) { + if (pCtx->subsidiaries.rowLen == 0) { + int32_t rowLen = 0; + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + rowLen += pc->pExpr->base.resSchema.bytes; + } + + return rowLen + pCtx->subsidiaries.num * sizeof(bool); + } else { + return pCtx->subsidiaries.rowLen; + } +} + +int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { + int32_t size = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); + size = TMAX(size, resSize); + } + return size; +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2720,8 +2743,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2730,10 +2756,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->numOfChild = numOfChild; @@ -2766,7 +2788,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; @@ -4889,9 +4912,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4912,10 +4939,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - pInfo->pPhyNode = NULL; // create new child pInfo->pPullDataMap = NULL; pInfo->pPullWins = NULL; // SPullWindowInfo @@ -4928,7 +4951,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(pSup, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 80b26bd39b..1a43802e6b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -883,10 +883,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu } pStart += pDstCol->info.bytes; } - - if (pCtx->saveHandle.pState) { - streamFreeVal((void*)p); - } } return TSDB_CODE_SUCCESS; @@ -3123,7 +3119,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key, STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { @@ -3171,7 +3167,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); - STupleKey key; + SWinKey key; if (pCtx->saveHandle.pBuf == NULL) { SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { @@ -3179,7 +3175,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* key.groupId = pSrcBlock->info.id.groupId; key.ts = skey; - key.exprIdx = pCtx->exprIdx; } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index dbec1c95a7..26e1c2ab43 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,10 +16,10 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50) #define ONE_MB_F (1048576.0) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) int32_t streamInit() { int8_t old; @@ -96,8 +96,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { } int32_t streamSchedExec(SStreamTask* pTask) { - int8_t schedStatus = - atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); + int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, + TASK_SCHED_STATUS__WAITING); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); @@ -111,7 +111,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { pRunReq->streamId = pTask->id.streamId; pRunReq->taskId = pTask->id.taskId; - SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) }; + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); qDebug("trigger to run s-task:%s", pTask->id.idStr); } @@ -283,7 +283,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S } bool tInputQueueIsFull(const SStreamTask* pTask) { - bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); } @@ -294,23 +294,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem; - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total, size); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, - total, size); - streamDataSubmitDestroy(pSubmitBlock); - taosFreeQitem(pSubmitBlock); + SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + px->submit.msgLen, px->submit.ver, numOfBlocks, size); + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + size); + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); return -1; } - - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", @@ -331,10 +334,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -#if 0 - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; } @@ -356,6 +355,4 @@ void* streamQueueNextItem(SStreamQueue* queue) { } } -void streamTaskInputFail(SStreamTask* pTask) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); -} \ No newline at end of file +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 1da249b5a2..b70a8c93a9 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen); if (pDataSubmit == NULL) { return NULL; } @@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { return NULL; } - pDataSubmit->submit = submit; + pDataSubmit->submit = *pData; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; return pDataSubmit; } -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); @@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { } } -SStreamMergedSubmit2* streamMergedSubmitNew() { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) { return NULL; } @@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() { return pMerged; } -int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->submits, &pSubmit->submit); pMerged->ver = pSubmit->ver; return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) { int32_t len = 0; if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { len = pSubmit->submit.msgLen; } - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } @@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; - SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); // todo handle error - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; @@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitDestroy((SStreamDataSubmit2*)data); + streamDataSubmitDestroy((SStreamDataSubmit*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e05660da32..d14e8102d8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); @@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); @@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -269,6 +269,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); while (1) { + if (streamTaskShouldPause(&pTask->status)) { + return 0; + } SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { @@ -367,11 +370,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->blocks = pRes; if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } @@ -408,7 +411,8 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && + (!streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d2b2c7d840..4cfeedab57 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -106,3 +106,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { return (SStreamQueueRes){0}; } #endif + +#define MAX_STREAM_EXEC_BATCH_NUM 128 +#define MIN_STREAM_EXEC_BATCH_NUM 16 + +// todo refactor: +// read data from input queue +typedef struct SQueueReader { + SStreamQueue* pQueue; + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms +} SQueueReader; + +SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { + int32_t numOfBlocks = 0; + int32_t tryCount = 0; + SStreamQueueItem* pRet = NULL; + + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue); + if (qItem == NULL) { + if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { + tryCount++; + taosMsleep(1); + qDebug("===stream===try again batchSize:%d", numOfBlocks); + continue; + } + + qDebug("===stream===break batchSize:%d", numOfBlocks); + break; + } + + if (pRet == NULL) { + pRet = qItem; + streamQueueProcessSuccess(pReader->pQueue); + if (pReader->taskLevel == TASK_LEVEL__SINK) { + break; + } + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = NULL; + if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) { + streamQueueProcessFail(pReader->pQueue); + break; + } else { + numOfBlocks++; + pRet = newRet; + streamQueueProcessSuccess(pReader->pQueue); + if (numOfBlocks > pReader->maxBlocks) { + qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr); + break; + } + } + } + } + + return pRet; +} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 373cb27941..98b685d8b9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -272,26 +272,30 @@ int32_t streamStateCommit(SStreamState* pState) { #endif } -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - return streamStateFuncPut_rocksdb(pState, key, value, vLen); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + memcpy(buf + len - rowSize, value, vLen); + return code; #else return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); #endif } -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + *ppVal = buf + len - rowSize; + return code; #else - return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); -#endif -} - -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { -#ifdef USE_ROCKSDB - return streamStateFuncDel_rocksdb(pState, key); -#else - return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn); + return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); #endif } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 67835e77b8..ad66bb5a27 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -32,6 +32,7 @@ struct SStreamFileState { SSHashObj* rowBuffMap; void* pFileStore; int32_t rowSize; + int32_t selectivityRowSize; int32_t keyLen; uint64_t preCheckPointVersion; uint64_t checkPointVersion; @@ -45,8 +46,8 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, - TSKEY delMark) { +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, + GetTsFun fp, void* pFile, TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -58,6 +59,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } + rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); @@ -69,11 +71,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ } pFileState->keyLen = keySize; pFileState->rowSize = rowSize; + pFileState->selectivityRowSize = selectRowSize; pFileState->preCheckPointVersion = 0; pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; - pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = INT64_MIN; @@ -441,9 +443,12 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN - : pFileState->maxTs - pFileState->deleteMark; - deleteExpiredCheckPoint(pFileState, mark); + if (pFileState->maxTs != INT64_MIN) { + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) + ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + deleteExpiredCheckPoint(pFileState, mark); + } void* pStVal = NULL; int32_t len = 0; @@ -478,4 +483,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { streamStateFreeCur(pCur); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} + +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index fa7be19310..402e0086f7 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -307,4 +307,63 @@ sql resume stream IF EXISTS streams66666666; print ===== step 4 over +print ===== step5 +sql drop stream if exists streams6; +sql drop database if exists test6; +sql create database test6 vgroups 10; +sql use test6; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s); + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sleep 1000 + +sql pause stream streams6; + +sleep 1000 + + +sql insert into ts1 values(1648791223001,1,12,3,1.0); +sql insert into ts2 values(1648791233001,1,12,3,1.0); + +sql resume stream streams6; + +sql insert into ts3 values(1648791243001,1,12,3,1.0); +sql insert into ts4 values(1648791253001,1,12,3,1.0); + +$loop_count = 0 +loop6: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 2 select * from streamt6; +sql select * from streamt6; + +if $rows != 5 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + print $data30 $data31 $data32 + print $data40 $data41 $data42 + print $data50 $data51 $data52 + goto loop6 +endi + +print ===== step5 over + system sh/stop_dnodes.sh