From 204fb97f68140232de4c7261d9308c812dbb9071 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Aug 2023 17:34:31 +0800 Subject: [PATCH] fix(stream): fix error in snode, fix memory leaks, fix repeatly create bug in sink task. --- include/libs/stream/tstream.h | 2 +- include/util/types.h | 2 - source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqSink.c | 658 +++++++++++++++++----------- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamMeta.c | 11 +- 6 files changed, 411 insertions(+), 266 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6456d2baa0..89680f8f96 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -106,7 +106,7 @@ typedef struct { int8_t type; } SStreamQueueItem; -typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { diff --git a/include/util/types.h b/include/util/types.h index b49670220b..0aa01a66f5 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -85,8 +85,6 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 65535 #define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) -#define NCHAR_WIDTH_TO_BYTES(n) ((n)*TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE) - typedef int32_t VarDataOffsetT; typedef struct tstr { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5f8e63067b..93d4b2163d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -155,7 +155,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr); -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6acc5b7457..591b61138c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -21,9 +21,14 @@ typedef struct STableSinkInfo { uint64_t uid; - char tbName[TSDB_TABLE_NAME_LEN]; + tstr name; } STableSinkInfo; +static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, + SSDataBlock* pDataBlock, SStreamTask* pTask); +static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, + int64_t suid); + int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { int32_t totalRows = pDataBlock->info.rows; @@ -97,17 +102,17 @@ end: return ret; } -static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { +static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); if (pVal) { *pInfo = *(STableSinkInfo**)pVal; - return TSDB_CODE_SUCCESS; + return true; } - return TSDB_CODE_FAILED; + return false; } -int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { +static int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { return TSDB_CODE_FAILED; } @@ -115,7 +120,7 @@ int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTb return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); } -int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { +static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); @@ -128,59 +133,32 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { + +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; int64_t suid = pTask->tbSink.stbUid; char* stbFullName = pTask->tbSink.stbFullName; STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t vgId = TD_VID(pVnode); + int32_t numOfBlocks = taosArrayGetSize(pBlocks); + int32_t code = TSDB_CODE_SUCCESS; - int32_t blockSz = taosArrayGetSize(pBlocks); + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); - tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz); - - void* pBuf = NULL; SArray* tagArray = NULL; SArray* pVals = NULL; SArray* crTblArray = NULL; - for (int32_t i = 0; i < blockSz; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); int32_t rows = pDataBlock->info.rows; if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; - - tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); - if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { - taosArrayDestroy(deleteReq.deleteReqs); - continue; - } - - int32_t len; - int32_t code; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code != TSDB_CODE_SUCCESS) { - qError("s-task:%s failed to encode delete request", pTask->id.idStr); - } - - SEncoder encoder; - void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - taosArrayDestroy(deleteReq.deleteReqs); - - ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - - SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) }; - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put delete req into write-queue since %s", terrstr()); - } + code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid); } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { SVCreateTbBatchReq reqs = {0}; - crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); + crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); if (NULL == reqs.pArray) { goto _end; } @@ -209,10 +187,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; + .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; taosArrayPush(tagArray, &tagVal); @@ -254,7 +229,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d goto _end; } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; // set table name @@ -271,228 +245,18 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { goto _end; } + tagArray = taosArrayDestroy(tagArray); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); crTblArray = NULL; } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { - SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); - - if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { - goto _end; - } - - tbData.suid = suid; - tbData.uid = 0; // uid is assigned by vnode - tbData.sver = pTSchema->version; - - STableSinkInfo* pTableSinkInfo = NULL; - int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); - if (res != TSDB_CODE_SUCCESS) { - pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo)); - } - - char* ctbName = pDataBlock->info.parTbName; - if (!ctbName[0]) { - memset(ctbName, 0, TSDB_TABLE_NAME_LEN); - if (res == TSDB_CODE_SUCCESS) { - memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); - } else { - buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName); - memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName)); - tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), - pDataBlock->info.id.groupId); - } - } - - if (res == TSDB_CODE_SUCCESS) { - tbData.uid = pTableSinkInfo->uid; - } else { - SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, ctbName) < 0) { - metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - - SVCreateTbReq* pCreateTbReq = NULL; - - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - goto _end; - }; - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content - tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - taosMemoryFreeClear(pCreateTbReq); - goto _end; - } - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - tdDestroySVCreateTbReq(pCreateTbReq); - taosMemoryFreeClear(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - pCreateTbReq->name = taosStrdup(ctbName); - - tbData.pCreateTbReq = pCreateTbReq; - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - } else { - if (mr.me.type != TSDB_CHILD_TABLE) { - tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, - mr.me.type); - metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); - continue; - } - - if (mr.me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 - ", actual suid %" PRId64 "", - TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); - metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); - continue; - } - - tbData.uid = mr.me.uid; - pTableSinkInfo->uid = mr.me.uid; - int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pTableSinkInfo); - } - metaReaderClear(&mr); - } - } - - // rows - if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { - taosArrayDestroy(tbData.aRowP); - tdDestroySVCreateTbReq(tbData.pCreateTbReq); - goto _end; - } - - for (int32_t j = 0; j < rows; j++) { - taosArrayClear(pVals); - int32_t dataIndex = 0; - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pCol = &pTSchema->columns[k]; - if (k == 0) { - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); - void* colData = colDataGetData(pColData, j); - tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); - } - if (IS_SET_NULL(pCol)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); - } else { - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); - if (colDataIsNull_s(pColData, j)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); - dataIndex++; - } else { - void* colData = colDataGetData(pColData, j); - if (IS_STR_DATA_TYPE(pCol->type)) { - // address copy, no value - SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } else { - SValue sv; - memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes); - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } - dataIndex++; - } - } - } - SRow* pRow = NULL; - if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - goto _end; - } - ASSERT(pRow); - taosArrayPush(tbData.aRowP, &pRow); - } - - SSubmitReq2 submitReq = {0}; - if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - goto _end; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - - // encode - int32_t len; - int32_t code; - tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); - SEncoder encoder; - len += sizeof(SSubmitReq2Msg); - pBuf = rpcMallocCont(len); - if (NULL == pBuf) { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - goto _end; - } - ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); - ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); - ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to encode submit req since %s", terrstr()); - tEncoderClear(&encoder); - rpcFreeCont(pBuf); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - continue; - } - tEncoderClear(&encoder); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - - SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put into write-queue since %s", terrstr()); - } + code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask); } } - tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); _end: taosArrayDestroy(tagArray); @@ -500,3 +264,377 @@ _end: taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); // TODO: change } + +int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, + int64_t suid) { + SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + + int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { + taosArrayDestroy(deleteReq.deleteReqs); + return TSDB_CODE_SUCCESS; + } + + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code != TSDB_CODE_SUCCESS) { + qError("s-task:%s failed to encode delete request", pTask->id.idStr); + return code; + } + + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + taosArrayDestroy(deleteReq.deleteReqs); + + ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode); + + SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)}; + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + tqDebug("failed to put delete req into write-queue since %s", terrstr()); + } + + return TSDB_CODE_SUCCESS; +} + +static bool isValidDestChildTable(SMetaReader* pReader, int32_t vgId, char* ctbName, int64_t suid) { + if (pReader->me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type); + return false; + } + + if (pReader->me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, + vgId, ctbName, suid, pReader->me.ctbEntry.suid); + return false; + } + + return true; +} + +static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { + char* ctbName = pDataBlock->info.parTbName; + + SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)); + if (pCreateTbReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + // set tag content + SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); + if (tagArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(pCreateTbReq); + taosMemoryFreeClear(pCreateTbReq); + return NULL; + } + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); + + STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + taosArrayDestroy(tagArray); + + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + taosMemoryFreeClear(pCreateTbReq); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + taosArrayPush(tagName, "group_id"); + + pCreateTbReq->ctb.tagName = tagName; + + // set table name + pCreateTbReq->name = taosStrdup(ctbName); + return pCreateTbReq; +} + +static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, uint64_t uid, + const char* id) { + pTableSinkInfo->uid = uid; + + int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + tqError("s-task:%s failed to put tableSinkInfo in to cache, code:%s", id, tstrerror(code)); + } else { + tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data, + pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap)); + } + + return code; +} + +int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, + SStreamTask* pTask) { + int32_t numOfRows = pDataBlock->info.rows; + int32_t vgId = TD_VID(pVnode); + uint64_t groupId = pDataBlock->info.id.groupId; + STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t code = TSDB_CODE_SUCCESS; + void* pBuf = NULL; + SArray* pVals = NULL; + const char* id = pTask->id.idStr; + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64, + id, blockIndex + 1, numOfRows, suid); + + tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); + pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); + + if (tbData.aRowP == NULL || pVals == NULL) { + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare write stream res blocks, code:%s", id, vgId, tstrerror(code)); + return code; + } + + STableSinkInfo* pTableSinkInfo = NULL; + bool exist = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo); + + char* dstTableName = pDataBlock->info.parTbName; + if (exist) { + if (dstTableName[0] == 0) { + tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1); + tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId, + dstTableName); + } else { + if (pTableSinkInfo->uid != 0) { + tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId, + dstTableName, pTableSinkInfo->uid); + } else { + tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)", + id, numOfRows, groupId, dstTableName); + } + } + } else { // not exist + memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); + buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + + int32_t nameLen = strlen(dstTableName); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); + + pTableSinkInfo->name.len = nameLen; + memcpy(pTableSinkInfo->name.data, dstTableName, nameLen); + tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); + } + + if (exist) { + tbData.uid = pTableSinkInfo->uid; + + if (tbData.uid == 0) { + tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); + } + + while (pTableSinkInfo->uid == 0) { + // wait for the table to be created + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, 0); + + code = metaGetTableEntryByName(&mr, dstTableName); + if (code == 0) { // table alreay exists, check its type and uid + bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); + if (!isValid) { // not valid table, ignore it + metaReaderClear(&mr); + + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + + return TSDB_CODE_SUCCESS; + } else { + tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); + + tbData.uid = mr.me.uid; + pTableSinkInfo->uid = mr.me.uid; + metaReaderClear(&mr); + } + } else { // not exist, wait and retry + metaReaderClear(&mr); + taosMsleep(100); + tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName); + } + } + + } else { + // todo: this check is not safe, and results in losing of submit message from WAL. + // The auto-create option will always set to be open for those submit messages, which arrive during the period + // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning + // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for + // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have + // randomly generated false table uid in the WAL. + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, 0); + + // table not in cache, let's try the extract it from tsdb meta + if (metaGetTableEntryByName(&mr, dstTableName) < 0) { + metaReaderClear(&mr); + + tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); + + tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); + if (tbData.pCreateTbReq == NULL) { + tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); + + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + + return terrno; + } + + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id); + } else { + bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); + if (!isValid) { + metaReaderClear(&mr); + taosMemoryFree(pTableSinkInfo); + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + return TSDB_CODE_SUCCESS; + } else { + tbData.uid = mr.me.uid; + metaReaderClear(&mr); + + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id); + } + } + } + + // rows + for (int32_t j = 0; j < numOfRows; j++) { + taosArrayClear(pVals); + + int32_t dataIndex = 0; + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pCol = &pTSchema->columns[k]; + if (k == 0) { + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); + void* colData = colDataGetData(pColData, j); + tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); + } + + if (IS_SET_NULL(pCol)) { + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); + taosArrayPush(pVals, &cv); + } else { + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); + if (colDataIsNull_s(pColData, j)) { + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); + taosArrayPush(pVals, &cv); + dataIndex++; + } else { + void* colData = colDataGetData(pColData, j); + if (IS_STR_DATA_TYPE(pCol->type)) { + // address copy, no value + SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } else { + SValue sv; + memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes); + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } + dataIndex++; + } + } + } + + SRow* pRow = NULL; + code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); + if (code != TSDB_CODE_SUCCESS) { + tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); + + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + return code; + } + + ASSERT(pRow); + taosArrayPush(tbData.aRowP, &pRow); + } + + SSubmitReq2 submitReq = {0}; + if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); + + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + + // encode + int32_t len = 0; + tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); + + SEncoder encoder; + len += sizeof(SSubmitReq2Msg); + + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + } + + ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); + if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); + tEncoderClear(&encoder); + rpcFreeCont(pBuf); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + + return code; + } + + tEncoderClear(&encoder); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + + SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; + code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); + + if(code == TSDB_CODE_SUCCESS) { + tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows); + } else { + tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); + } + + taosArrayDestroy(pVals); + return code; +} \ No newline at end of file diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index afd51b1e8b..ccbfdfe82b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -221,7 +221,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t code = 0; int32_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__TABLE) { - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks); destroyStreamDataBlock(pBlock); } else if (type == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 817ceed69c..dfd9534a60 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -254,7 +254,16 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { void streamMetaClear(SStreamMeta* pMeta) { void* pIter = NULL; while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) { - streamMetaReleaseTask(pMeta, *(SStreamTask**)pIter); + SStreamTask* p = *(SStreamTask**)pIter; + + // release the ref by timer + if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer + taosTmrStop(p->schedTimer); + p->triggerParam = 0; + streamMetaReleaseTask(pMeta, p); + } + + streamMetaReleaseTask(pMeta, p); } taosRemoveRef(streamBackendId, pMeta->streamBackendRid);