diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 675bfa334a..238407b26c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 137c2f4f7e..df1720d4a7 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -286,8 +286,8 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); -int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); -int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); +int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len); +int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 92b8c09fbc..3512f1476f 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -217,10 +217,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t lino = 0; SVnode *pVnode = pSma->pVnode; - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - goto _exit; - } + if (!SMA_RSMA_ENV(pSma)) goto _exit; code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index dd21ec2b30..6742f30d53 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -43,7 +43,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz ERsmaExecType type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); -static void tdFreeRSmaSubmitItems(SArray *pItems); +static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, int32_t execType, int8_t *streamFlushed); @@ -723,7 +723,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header(type+len+version) + payload + int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -940,12 +940,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - if (!pEnv) { - // only applicable when rsma env exists - return TDB_CODE_SUCCESS; - } +int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { + if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr()); @@ -954,27 +950,25 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, STbUidStore uidStore = {0}; - if (inputType == STREAM_INPUT__DATA_SUBMIT) { - if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { - smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); + if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { + smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); + goto _err; + } + + if (uidStore.suid != 0) { + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); goto _err; } - if (uidStore.suid != 0) { - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + void *pIter = NULL; + while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); + taosHashCancelIterate(uidStore.uidHash, pIter); goto _err; } - - void *pIter = NULL; - while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { - tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); - taosHashCancelIterate(uidStore.uidHash, pIter); - goto _err; - } - } } } tdUidStoreDestory(&uidStore); @@ -984,19 +978,18 @@ _err: return terrno; } -int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - if (!pEnv) { - // only applicable when rsma env exists - return TSDB_CODE_SUCCESS; +int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { + if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; + + if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { + smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr()); + goto _err; } - if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { - SDeleteRes *pReq = pReq; - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); - goto _err; - } + SDeleteRes *pDelRes = pReq; + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + goto _err; } return TSDB_CODE_SUCCESS; _err: @@ -1381,10 +1374,20 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId); } -static void tdFreeRSmaSubmitItems(SArray *pItems) { - for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { - SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); +static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) { + int32_t arrSize = taosArrayGetSize(pItems); + if (type == STREAM_INPUT__MERGED_SUBMIT) { + for (int32_t i = 0; i < arrSize; ++i) { + SPackedData *packData = TARRAY_GET_ELEM(pItems, i); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); + } + } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { + for (int32_t i = 0; i < arrSize; ++i) { + SPackedData *packData = TARRAY_GET_ELEM(pItems, i); + blockDataDestroy(packData->pDataBlock); + } + } else { + ASSERTS(0, "unknown type:%d", type); } taosArrayClear(pItems); } @@ -1448,34 +1451,37 @@ _err: return TSDB_CODE_FAILED; } +#define RSMA_SUBMIT_MSG_TYPE(msg) (*(int8_t *)(msg)) +#define RSMA_SUBMIT_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) +#define RSMA_SUBMIT_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) +#define RSMA_SUBMIT_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t))) + static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { - - void *msg = NULL; - int8_t resume = 0; - int32_t nSubmit = 0; - int32_t nDelete = 0; + void *msg = NULL; + int8_t resume = 0; + int32_t nSubmit = 0; + int32_t nDelete = 0; SPackedData packData; taosArrayClear(pSubmitArr); + // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode while (1) { taosGetQitem(qall, (void **)&msg); if (msg) { - int8_t inputType = *(int8_t *)msg; - - msg = POINTER_SHIFT(msg, sizeof(int8_t)); - + int8_t inputType = RSMA_SUBMIT_MSG_TYPE(msg); if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (nDelete > 0) { resume = 1; break; } _resume_submit: - packData.msgLen = *(int32_t *)msg; - packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)); - packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t)); + packData.msgLen = RSMA_SUBMIT_MSG_LEN(msg); + packData.ver = RSMA_SUBMIT_MSG_VER(msg); + packData.msgStr = RSMA_SUBMIT_MSG_BODY(msg); if (!taosArrayPush(pSubmitArr, &packData)) { + taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1486,46 +1492,49 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA break; } _resume_delete: - ++nDelete; -#if 0 - if (!taosArrayPush(pSubmitArr, &packData)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } -#endif - } else { - break; - } - } - - if (nSubmit > 0) { - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + extractDelDataBlock(RSMA_SUBMIT_MSG_BODY(msg), RSMA_SUBMIT_MSG_LEN(msg), RSMA_SUBMIT_MSG_VER(msg), + &packData.pDataBlock, 1); + if (!taosArrayPush(pSubmitArr, &packData)) { + taosFreeQitem(msg); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + taosFreeQitem(msg); + ++nDelete; + } else { + ASSERTS(0, "unknown msg type:%d", inputType); + break; } - tdFreeRSmaSubmitItems(pSubmitArr); } - } else if (nDelete > 0) { - } - if (resume == 0) { - goto _rtn; - } else if (resume == 1) { - nSubmit = 0; - nDelete = 0; - resume = 0; - taosArrayClear(pSubmitArr); - goto _resume_submit; - } else { - nSubmit = 0; - nDelete = 0; - resume = 0; - taosArrayClear(pSubmitArr); - goto _resume_delete; + if (nSubmit > 0 || nDelete > 0) { + int32_t size = TARRAY_SIZE(pSubmitArr); + if (size > 0) { + int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr, inputType); + } + } + + if (resume == 0) { + goto _rtn; + } else if (resume == 1) { + nSubmit = 0; + nDelete = 0; + resume = 0; + tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK); + goto _resume_submit; + } else { + nSubmit = 0; + nDelete = 0; + resume = 0; + tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT); + goto _resume_delete; + } } _rtn: @@ -1534,7 +1543,7 @@ _err: atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); - tdFreeRSmaSubmitItems(pSubmitArr); + tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK); while (1) { void *msg = NULL; taosGetQitem(qall, (void **)&msg); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bd2a591a98..41b1aa7bd1 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -343,7 +343,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d8fe899bf6..f7d0ad54e9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type) { SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -442,14 +442,19 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream } taosArrayDestroy(pRes->uidList); - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if (*pRefBlock == NULL) { - blockDataCleanup(pDelBlock); - taosMemoryFree(pDelBlock); - return TSDB_CODE_OUT_OF_MEMORY; + if (type == 0) { + *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); + if (*pRefBlock == NULL) { + blockDataCleanup(pDelBlock); + taosMemoryFree(pDelBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK; + ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock; + } else { + *pRefBlock = pDelBlock; } - (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; - (*pRefBlock)->pBlock = pDelBlock; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 44b6af4ff5..df42e02776 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -380,7 +380,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { SEncoder *pCoder = &(SEncoder){0}; SDeleteRes res = {0}; - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .skipRollup = 1}; initStorageAPI(&handle.api); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); @@ -1669,7 +1669,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); if (code == 0) { atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); - code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); + code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len); } // clear @@ -1947,7 +1947,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in if (code) goto _err; } - tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK); + tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d569f78322..1f82a9477b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -78,12 +78,11 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { - for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = {.pDataBlock = pDataBlock}; - taosArrayPush(pInfo->pBlockLists, &tmp); + for (int32_t i = 0; i < numOfBlocks; ++i) { + SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); + taosArrayPush(pInfo->pBlockLists, pReq); } - pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK; + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } return TSDB_CODE_SUCCESS;