diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d93a2ac6a3..19825b438b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1330,59 +1330,6 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { - SDecoder* pCoder = &(SDecoder){0}; - SDeleteRes* pRes = &(SDeleteRes){0}; - - (*pRefBlock) = NULL; - - pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); - if (pRes->uidList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - tDecoderInit(pCoder, (uint8_t*)pData, len); - tDecodeDeleteRes(pCoder, pRes); - tDecoderClear(pCoder); - - int32_t numOfTables = taosArrayGetSize(pRes->uidList); - if (numOfTables == 0 || pRes->affectedRows == 0) { - taosArrayDestroy(pRes->uidList); - return TSDB_CODE_SUCCESS; - } - - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - blockDataEnsureCapacity(pDelBlock, numOfTables); - pDelBlock->info.rows = numOfTables; - pDelBlock->info.version = ver; - - for (int32_t i = 0; i < numOfTables; i++) { - // start key column - SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); - colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column - SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); - colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); - // uid column - SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); - int64_t* pUid = taosArrayGet(pRes->uidList, i); - colDataSetVal(pUidCol, i, (const char*)pUid, false); - - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); - } - - taosArrayDestroy(pRes->uidList); - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if ((*pRefBlock) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; - (*pRefBlock)->pBlock = pDelBlock; - return TSDB_CODE_SUCCESS; -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a800d0ba1e..67fb818ff4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -454,7 +454,3 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream (*pRefBlock)->pBlock = pDelBlock; return TSDB_CODE_SUCCESS; } - -int32_t tqCreateCheckpointBlock(SStreamCheckpoint** pCheckpointBlock) { - return 0; -} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9706bee5b6..dbf946d310 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -598,7 +598,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask) { } int32_t streamAlignTransferState(SStreamTask* pTask) { - int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream); if (old == 0) { qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);