diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d160cd1fac..e7f0a6d297 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -203,10 +203,6 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); -int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, - SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen); - // sma int32_t smaInit(); void smaCleanUp(); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index e633d93de6..2179d96fe1 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -155,6 +155,200 @@ _exit: return code; } +int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, + SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName, + SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) { + void *pBuf = NULL; + int32_t len = 0; + SSubmitReq2 *pReq = NULL; + SArray *tagArray = NULL; + SArray *createTbArray = NULL; + SArray *pVals = NULL; + + int32_t sz = taosArrayGetSize(pBlocks); + + if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { + goto _end; + } + + if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) { + goto _end; + } + + if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + goto _end; + } + + // create table req + if (createTb) { + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); + SVCreateTbReq *pCreateTbReq = NULL; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + taosArrayPush(createTbArray, &pCreateTbReq); + continue; + } + + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; + + // don't move to the end of loop as to destroy in the end of func when error occur + taosArrayPush(createTbArray, &pCreateTbReq); + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = strdup((char *)tNameGetTableName(&name)); // strdup(stbFullName); + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag *pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t *)pTag; + + // set tag name + SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + + // set table name + if (pDataBlock->info.parTbName[0]) { + pCreateTbReq->name = strdup(pDataBlock->info.parTbName); + } else { + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + } + } + } + + // SSubmitTbData req + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + pDeleteReq->suid = suid; + pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + continue; + } + + int32_t rows = pDataBlock->info.rows; + + SSubmitTbData *pTbData = (SSubmitTbData *)taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (!pTbData) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow *)))) { + taosMemoryFree(pTbData); + goto _end; + } + pTbData->suid = suid; + pTbData->uid = 0; // uid is assigned by vnode + pTbData->sver = pTSchema->version; + + if (createTb) { + pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i); + if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + } + + if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { + taosArrayDestroy(pTbData->aRowP); + taosMemoryFree(pTbData); + goto _end; + } + + for (int32_t j = 0; j < rows; j++) { + taosArrayClear(pVals); + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn *pCol = &pTSchema->columns[k]; + SColumnInfoData *pColData = taosArrayGet(pDataBlock->pDataBlock, k); + if (colDataIsNull_s(pColData, j)) { + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); + taosArrayPush(pVals, &cv); + } else { + void *data = colDataGetData(pColData, j); + if (IS_STR_DATA_TYPE(pCol->type)) { + SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } else { + SValue sv; + memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } + } + } + SRow *pRow = NULL; + if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) { + tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); + goto _end; + } + ASSERT(pRow); + taosArrayPush(pTbData->aRowP, &pRow); + } + + taosArrayPush(pReq->aSubmitTbData, pTbData); + } + + // encode + tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); + if (TSDB_CODE_SUCCESS == terrno) { + SEncoder encoder; + len += sizeof(SMsgHead); + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + goto _end; + } + ((SMsgHead *)pBuf)->vgId = htonl(TD_VID(pVnode)); + ((SMsgHead *)pBuf)->contLen = htonl(len); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req since %s", terrstr()); + } + tEncoderClear(&encoder); + } +_end: + taosArrayDestroy(tagArray); + taosArrayDestroy(pVals); + tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + + if (terrno != 0) { + rpcFreeCont(pBuf); + taosArrayDestroy(pDeleteReq->deleteReqs); + return TSDB_CODE_FAILED; + } + if (ppData) *ppData = pBuf; + if (pLen) *pLen = len; + return TSDB_CODE_SUCCESS; +} + /** * @brief Insert/Update Time-range-wise SMA data. * @@ -220,8 +414,9 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char void *pSubmitReq = NULL; int32_t contLen = 0; - if (tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, - pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen) < 0) { + if (smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, + pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, + &contLen) < 0) { smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e8f2555f17..150ac57dfc 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,428 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } -#if 0 -SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, - SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq) { - SSubmitReq* ret = NULL; - SArray* schemaReqs = NULL; - SArray* schemaReqSz = NULL; - SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - int32_t sz = taosArrayGetSize(pBlocks); - - if (createTb) { - schemaReqs = taosArrayInit(sz, sizeof(void*)); - schemaReqSz = taosArrayInit(sz, sizeof(int32_t)); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - int32_t padding1 = 0; - void* padding2 = NULL; - taosArrayPush(schemaReqSz, &padding1); - taosArrayPush(schemaReqs, &padding2); - continue; - } - - // STag* pTag = NULL; - // taosArrayClear(tagArray); - // SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - // for(int j = 0; j < pTagSchemaWrapper->nCols; j++){ - // STagVal tagVal = { - // .cid = pTagSchemaWrapper->pSchema[j].colId, - // .type = pTagSchemaWrapper->pSchema[j].type, - // .i64 = (int64_t)pDataBlock->info.id.groupId, - // }; - // taosArrayPush(tagArray, &tagVal); - // taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); - // } - // - // tTagNew(tagArray, 1, false, &pTag); - // if (pTag == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - // taosArrayDestroy(tagArray); - // taosArrayDestroy(tagName); - // return NULL; - // } - - SVCreateTbReq createTbReq = {0}; - - // set const - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(tagArray); - taosArrayDestroyP(schemaReqs, taosMemoryFree); - taosArrayDestroy(schemaReqSz); - return NULL; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - createTbReq.ctb.tagName = tagName; - - // set table name - if (pDataBlock->info.parTbName[0]) { - createTbReq.name = strdup(pDataBlock->info.parTbName); - } else { - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } - - // save schema len - int32_t code; - int32_t schemaLen; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - if (code < 0) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - taosArrayDestroyP(schemaReqs, taosMemoryFree); - taosArrayDestroy(schemaReqSz); - return NULL; - } - taosArrayPush(schemaReqSz, &schemaLen); - - // save schema str - void* schemaStr = taosMemoryMalloc(schemaLen); - if (schemaStr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - taosArrayDestroyP(schemaReqs, taosMemoryFree); - taosArrayDestroy(schemaReqSz); - return NULL; - } - taosArrayPush(schemaReqs, &schemaStr); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, schemaStr, schemaLen); - code = tEncodeSVCreateTbReq(&encoder, &createTbReq); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - taosArrayDestroyP(schemaReqs, taosMemoryFree); - taosArrayDestroy(schemaReqSz); - tEncoderClear(&encoder); - return NULL; - } - tEncoderClear(&encoder); - tdDestroySVCreateTbReq(&createTbReq); - } - } - taosArrayDestroy(tagArray); - - // cal size - int32_t cap = sizeof(SSubmitReq); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - continue; - } - int32_t rows = pDataBlock->info.rows; - /*int32_t rowSize = pDataBlock->info.rowSize;*/ - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - - int32_t schemaLen = 0; - if (createTb) { - schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i); - } - cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; - } - - // assign data - ret = rpcMallocCont(cap); - ret->header.vgId = pVnode->config.vgId; - ret->length = sizeof(SSubmitReq); - ret->numOfBlocks = htonl(sz); - - SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - pDeleteReq->suid = suid; - pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); - continue; - } - - blkHead->numOfRows = htonl(pDataBlock->info.rows); - blkHead->sversion = htonl(pTSchema->version); - blkHead->suid = htobe64(suid); - // uid is assigned by vnode - blkHead->uid = 0; - - int32_t rows = pDataBlock->info.rows; - - int32_t dataLen = 0; - int32_t schemaLen = 0; - void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); - if (createTb) { - schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i); - void* schemaStr = taosArrayGetP(schemaReqs, i); - memcpy(blkSchema, schemaStr, schemaLen); - } - blkHead->schemaLen = htonl(schemaLen); - - STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); - for (int32_t j = 0; j < rows; j++) { - SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); - tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); - tdSRowResetBuf(&rb, rowData); - - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pColumn = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); - if (colDataIsNull_s(pColData, j)) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); - } else { - void* data = colDataGetData(pColData, j); - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); - } - } - tdSRowEnd(&rb); - int32_t rowLen = TD_ROW_LEN(rowData); - rowData = POINTER_SHIFT(rowData, rowLen); - dataLen += rowLen; - } - blkHead->dataLen = htonl(dataLen); - - ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; - blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen); - } - - ret->length = htonl(ret->length); - - taosArrayDestroyP(schemaReqs, taosMemoryFree); - taosArrayDestroy(schemaReqSz); - - return ret; -} -#endif - -int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, - SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen) { - void* pBuf = NULL; - int32_t len = 0; - SSubmitReq2* pReq = NULL; - SArray* tagArray = NULL; - SArray* createTbArray = NULL; - SArray* pVals = NULL; - - int32_t sz = taosArrayGetSize(pBlocks); - - if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { - goto _end; - } - - if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) { - goto _end; - } - - if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - goto _end; - } - - // create table req - if (createTb) { - for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - SVCreateTbReq* pCreateTbReq = NULL; - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - taosArrayPush(createTbArray, &pCreateTbReq); - continue; - } - - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - goto _end; - }; - - // don't move to the end of loop as to destroy in the end of func when error occur - taosArrayPush(createTbArray, &pCreateTbReq); - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - if (pDataBlock->info.parTbName[0]) { - pCreateTbReq->name = strdup(pDataBlock->info.parTbName); - } else { - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } - } - } - - // SSubmitTbData req - for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - pDeleteReq->suid = suid; - pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); - continue; - } - - int32_t rows = pDataBlock->info.rows; - - SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); - if (!pTbData) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { - taosMemoryFree(pTbData); - goto _end; - } - pTbData->suid = suid; - pTbData->uid = 0; // uid is assigned by vnode - pTbData->sver = pTSchema->version; - - if (createTb) { - pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i); - if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - } - - if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { - taosArrayDestroy(pTbData->aRowP); - taosMemoryFree(pTbData); - goto _end; - } - - for (int32_t j = 0; j < rows; j++) { - taosArrayClear(pVals); - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pCol = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); - if (colDataIsNull_s(pColData, j)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); - } else { - void* data = colDataGetData(pColData, j); - if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } else { - SValue sv; - memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); - SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); - taosArrayPush(pVals, &cv); - } - } - } - SRow* pRow = NULL; - if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { - tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); - goto _end; - } - ASSERT(pRow); - taosArrayPush(pTbData->aRowP, &pRow); - } - - taosArrayPush(pReq->aSubmitTbData, pTbData); - } - - // encode - tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); - if (TSDB_CODE_SUCCESS == terrno) { - SEncoder encoder; - len += sizeof(SMsgHead); - pBuf = rpcMallocCont(len); - if (NULL == pBuf) { - goto _end; - } - ((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode)); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); - if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to encode submit req since %s", terrstr()); - } - tEncoderClear(&encoder); - } -_end: - taosArrayDestroy(tagArray); - taosArrayDestroy(pVals); - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); - - if (terrno != 0) { - rpcFreeCont(pBuf); - taosArrayDestroy(pDeleteReq->deleteReqs); - return TSDB_CODE_FAILED; - } - if (ppData) *ppData = pBuf; - if (pLen) *pLen = len; - return TSDB_CODE_SUCCESS; -} - void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -984,56 +562,3 @@ _end: taosArrayDestroy(pVals); // TODO: change } - -#if 0 -void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { - const SArray* pRes = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; - SBatchDeleteReq deleteReq = {0}; - - tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size); - - deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, - pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq); - - tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); - - if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { - int32_t code; - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - SEncoder encoder; - void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - - ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - - SRpcMsg msg = { - .msgType = TDMT_VND_BATCH_DEL, - .pCont = serializedDeleteReq, - .contLen = len + sizeof(SMsgHead), - }; - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - rpcFreeCont(serializedDeleteReq); - tqDebug("failed to put into write-queue since %s", terrstr()); - } - } - taosArrayDestroy(deleteReq.deleteReqs); - - /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ - // build write msg - SRpcMsg msg = { - .msgType = TDMT_VND_SUBMIT, - .pCont = submitReq, - .contLen = ntohl(submitReq->length), - }; - - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put into write-queue since %s", terrstr()); - } -} -#endif