From e66283d99ccbefe442f05630b1c164d19062a557 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 May 2024 18:41:17 +0800 Subject: [PATCH] fix(stream): remove table meta in table-meta cache. --- source/dnode/vnode/src/tq/tqSink.c | 153 ++++++++++++++++------------- 1 file changed, 85 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3fe124ea85..96ccf18504 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -39,6 +39,7 @@ static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STab const char* dstTableName, int64_t* uid); static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); +static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); @@ -396,46 +397,6 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c return TSDB_CODE_SUCCESS; } -int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, - int64_t suid) { - SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; - - int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, - pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { - taosArrayDestroy(deleteReq.deleteReqs); - return TSDB_CODE_SUCCESS; - } - - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code != TSDB_CODE_SUCCESS) { - qError("s-task:%s failed to encode delete request", pTask->id.idStr); - return code; - } - - SEncoder encoder; - void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - taosArrayDestroy(deleteReq.deleteReqs); - - ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode); - - SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)}; - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put delete req into write-queue since %s", terrstr()); - } - - return TSDB_CODE_SUCCESS; -} - bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) { if (pReader->me.type != TSDB_CHILD_TABLE) { tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type); @@ -484,23 +445,6 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in return pCreateTbReq; } -int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { - if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { - taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it - return TSDB_CODE_FAILED; - } - - int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pTableSinkInfo); - } else { - tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data, - pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap)); - } - - return code; -} - int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { int32_t code = 0; void* pBuf = NULL; @@ -826,17 +770,6 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_ return code; } -bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { - for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* p = taosArrayGet(pBlocks, i); - if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { - return false; - } - } - - return true; -} - void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -884,6 +817,11 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id); + tbData.pCreateTbReq = NULL; + } continue; } @@ -934,6 +872,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { if (tbData.pCreateTbReq != NULL) { tdDestroySVCreateTbReq(tbData.pCreateTbReq); + doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id); tbData.pCreateTbReq = NULL; } continue; @@ -973,3 +912,81 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } } } + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { + return false; + } + } + + return true; +} + +int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { + if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { + taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it + return TSDB_CODE_FAILED; + } + + int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + } else { + tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data, + pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap)); + } + + return code; +} + +int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) { + if (tSimpleHashGetSize(pSinkTableMap) == 0) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId)); + tqDebug("s-task:%s remove cached table meta for groupId:%"PRId64, id, groupId); +} + +int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, + int64_t suid) { + SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + + int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { + taosArrayDestroy(deleteReq.deleteReqs); + return TSDB_CODE_SUCCESS; + } + + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code != TSDB_CODE_SUCCESS) { + qError("s-task:%s failed to encode delete request", pTask->id.idStr); + return code; + } + + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + taosArrayDestroy(deleteReq.deleteReqs); + + ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode); + + SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)}; + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + tqDebug("failed to put delete req into write-queue since %s", terrstr()); + } + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file