diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 30b2fb74ca..d6e4b86097 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -167,9 +167,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq); -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr); +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 6a4bddc991..3542ea9ffb 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -250,7 +250,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29daeb1eef..8d9d5c5fa4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -637,7 +637,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; + pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; int32_t ver1 = 1; SMetaInfo info = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3cb691cfb5..9373f23c02 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,23 +17,24 @@ #include "tmsg.h" #include "tq.h" -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq) { - int32_t totRow = pDataBlock->info.rows; +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr) { + int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - tqDebug("stream delete msg: row %d", totRow); + tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); - for (int32_t row = 0; row < totRow; row++) { - int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row); - int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row); + for (int32_t row = 0; row < totalRows; row++) { + int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); + int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); + char* name; void* varTbName = NULL; - if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) { + if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } @@ -43,31 +44,17 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64, - pVnode->config.vgId, groupId, name, startTs, endTs); -#if 0 - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, name) < 0) { - metaReaderClear(&mr); - tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); - taosMemoryFree(name); - continue; - } - int64_t uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFree(name); -#endif - SSingleDeleteReq req = { - .startTs = startTs, - .endTs = endTs, - }; + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, + pIdStr, groupId, name, skey, ekey); + + SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); taosMemoryFree(name); - /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ + taosArrayPush(deleteReq->deleteReqs, &req); } + return 0; } @@ -108,12 +95,7 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tlen = 0; encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); - SRpcMsg msg = { - .msgType = TDMT_VND_CREATE_TABLE, - .pCont = buf, - .contLen = tlen, - }; - + SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqError("failed to put into write-queue since %s", terrstr()); } @@ -121,13 +103,12 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; int64_t suid = pTask->tbSink.stbUid; char* stbFullName = pTask->tbSink.stbFullName; STSchema* pTSchema = pTask->tbSink.pTSchema; - /*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/ int32_t blockSz = taosArrayGetSize(pBlocks); @@ -141,11 +122,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); int32_t rows = pDataBlock->info.rows; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq deleteReq = {0}; - deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - deleteReq.suid = suid; - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + + tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { taosArrayDestroy(deleteReq.deleteReqs); continue; @@ -154,10 +135,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t len; int32_t code; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); + if (code != TSDB_CODE_SUCCESS) { + qError("s-task:%s failed to encode delete request", pTask->id.idStr); } + SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); @@ -168,11 +149,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - SRpcMsg msg = { - .msgType = TDMT_VND_BATCH_DEL, - .pCont = serializedDeleteReq, - .contLen = len + sizeof(SMsgHead), - }; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } @@ -182,6 +159,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* if (NULL == reqs.pArray) { goto _end; } + for (int32_t rowId = 0; rowId < rows; rowId++) { SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &createTbReq; @@ -204,11 +182,13 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tdDestroySVCreateTbReq(pCreateTbReq); goto _end; } + STagVal tagVal = { .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId, }; + taosArrayPush(tagArray, &tagVal); // set tag name @@ -271,7 +251,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -405,8 +385,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } else { void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = - (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value + // address copy, no value + SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else {