refact: tsma result adaption
This commit is contained in:
parent
11684ceb82
commit
42b17760a2
|
@ -3264,6 +3264,8 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
|
||||||
#define TSDB_MSG_FLG_ENCODE 0x1
|
#define TSDB_MSG_FLG_ENCODE 0x1
|
||||||
#define TSDB_MSG_FLG_DECODE 0x2
|
#define TSDB_MSG_FLG_DECODE 0x2
|
||||||
|
|
||||||
|
int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen, __tmalloc_fn_t fp);
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -47,6 +47,8 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
typedef void *(*__tmalloc_fn_t)(int64_t);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2183,11 +2183,13 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
|
||||||
int32_t vgId, tb_uid_t suid) {
|
int32_t vgId, tb_uid_t suid) {
|
||||||
SSubmitReq2* pReq = NULL;
|
SSubmitReq2* pReq = NULL;
|
||||||
SArray* pVals = NULL;
|
SArray* pVals = NULL;
|
||||||
int32_t bufSize = sizeof(SSubmitReq2);
|
|
||||||
int32_t numOfBlks = 0;
|
int32_t numOfBlks = 0;
|
||||||
int32_t sz = 1;
|
int32_t sz = 1;
|
||||||
|
|
||||||
if (!(pReq = taosMemoryMalloc(bufSize))) {
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2205,11 +2207,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
|
||||||
|
|
||||||
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||||
if (!pTbData) {
|
if (!pTbData) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pReq->aSubmitTbData, pTbData);
|
|
||||||
|
|
||||||
if(!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))){
|
if(!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))){
|
||||||
taosMemoryFree(pTbData);
|
taosMemoryFree(pTbData);
|
||||||
goto _end;
|
goto _end;
|
||||||
|
|
|
@ -7000,3 +7000,31 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2 *pReq, void **pData, uint32_t *pLen, __tmalloc_fn_t fp) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
uint32_t len = 0;
|
||||||
|
void *pBuf = NULL;
|
||||||
|
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SEncoder encoder;
|
||||||
|
len += sizeof(SMsgHead);
|
||||||
|
pBuf = (*fp)(len);
|
||||||
|
if (NULL == pBuf) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
((SMsgHead *)pBuf)->vgId = htonl(vgId);
|
||||||
|
((SMsgHead *)pBuf)->contLen = htonl(len);
|
||||||
|
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
|
||||||
|
code = tEncodeSSubmitReq2(&encoder, pReq);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pData = pBuf;
|
||||||
|
*pLen = len;
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -197,9 +197,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
|
||||||
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
|
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
|
||||||
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
||||||
SBatchDeleteReq* pDeleteReq);
|
SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen);
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
|
|
|
@ -204,18 +204,18 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
||||||
}
|
}
|
||||||
|
|
||||||
SBatchDeleteReq deleteReq = {0};
|
SBatchDeleteReq deleteReq = {0};
|
||||||
SSubmitReq *pSubmitReq =
|
void *pSubmitReq = NULL;
|
||||||
tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
|
int32_t contLen = 0;
|
||||||
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
|
|
||||||
// TODO deleteReq
|
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
|
||||||
|
|
||||||
|
if (tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
|
||||||
if (!pSubmitReq) {
|
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen) < 0) {
|
||||||
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||||
indexUid, tstrerror(terrno));
|
indexUid, tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
// TODO deleteReq
|
||||||
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
|
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
|
||||||
|
@ -224,7 +224,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
||||||
SRpcMsg submitReqMsg = {
|
SRpcMsg submitReqMsg = {
|
||||||
.msgType = TDMT_VND_SUBMIT,
|
.msgType = TDMT_VND_SUBMIT,
|
||||||
.pCont = pSubmitReq,
|
.pCont = pSubmitReq,
|
||||||
.contLen = ntohl(pSubmitReq->length),
|
.contLen = ntohl(contLen),
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
|
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
|
||||||
|
|
|
@ -72,6 +72,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
|
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
|
||||||
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
||||||
SBatchDeleteReq* pDeleteReq) {
|
SBatchDeleteReq* pDeleteReq) {
|
||||||
|
@ -299,6 +300,181 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
|
||||||
|
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
||||||
|
SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen) {
|
||||||
|
SSubmitReq2* pReq = NULL;
|
||||||
|
SArray* tagArray = NULL;
|
||||||
|
SArray* createTbArray = NULL;
|
||||||
|
SArray* pVals = NULL;
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pBlocks);
|
||||||
|
|
||||||
|
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create table req
|
||||||
|
if (createTb) {
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
SVCreateTbReq* pCreateTbReq = NULL;
|
||||||
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
taosArrayPush(createTbArray, &pCreateTbReq);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||||
|
goto _end;
|
||||||
|
};
|
||||||
|
|
||||||
|
// don't move to the end of loop as to destroy in the end of func when error occur
|
||||||
|
taosArrayPush(createTbArray, &pCreateTbReq);
|
||||||
|
|
||||||
|
// set const
|
||||||
|
pCreateTbReq->flags = 0;
|
||||||
|
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
||||||
|
pCreateTbReq->ctb.suid = suid;
|
||||||
|
|
||||||
|
// set super table name
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||||
|
|
||||||
|
// set tag content
|
||||||
|
taosArrayClear(tagArray);
|
||||||
|
STagVal tagVal = {
|
||||||
|
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||||
|
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||||
|
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||||
|
};
|
||||||
|
taosArrayPush(tagArray, &tagVal);
|
||||||
|
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
||||||
|
|
||||||
|
STag* pTag = NULL;
|
||||||
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
|
if (pTag == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||||
|
|
||||||
|
// set tag name
|
||||||
|
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||||
|
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||||
|
strcpy(tagNameStr, "group_id");
|
||||||
|
taosArrayPush(tagName, tagNameStr);
|
||||||
|
pCreateTbReq->ctb.tagName = tagName;
|
||||||
|
|
||||||
|
// set table name
|
||||||
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
|
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
|
||||||
|
} else {
|
||||||
|
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SSubmitTbData req
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
pDeleteReq->suid = suid;
|
||||||
|
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
|
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
|
||||||
|
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||||
|
if (!pTbData) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
||||||
|
taosMemoryFree(pTbData);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
pTbData->suid = suid;
|
||||||
|
pTbData->uid = 0; // uid is assigned by vnode
|
||||||
|
pTbData->sver = pTSchema->version;
|
||||||
|
|
||||||
|
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
|
||||||
|
|
||||||
|
if (createTb) {
|
||||||
|
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
||||||
|
taosArrayDestroy(pTbData->aRowP);
|
||||||
|
taosMemoryFree(pTbData);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
|
taosArrayClear(pVals);
|
||||||
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
|
const STColumn* pCol = &pTSchema->columns[k];
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
|
if (colDataIsNull_s(pColData, j)) {
|
||||||
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||||
|
taosArrayPush(pVals, &cv);
|
||||||
|
} else {
|
||||||
|
void* data = colDataGetData(pColData, j);
|
||||||
|
if (IS_STR_DATA_TYPE(pCol->type)) {
|
||||||
|
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
|
||||||
|
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
||||||
|
taosArrayPush(pVals, &cv);
|
||||||
|
} else {
|
||||||
|
SValue sv;
|
||||||
|
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes);
|
||||||
|
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
||||||
|
taosArrayPush(pVals, &cv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SRow* pRow = NULL;
|
||||||
|
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
|
||||||
|
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
ASSERT(pRow);
|
||||||
|
taosArrayPush(pTbData->aRowP, &pRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pReq->aSubmitTbData, pTbData);
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = tBuildSubmitReq(TD_VID(pVnode), pReq, ppData, pLen, rpcMallocCont);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
taosArrayDestroy(pVals);
|
||||||
|
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
|
||||||
|
|
||||||
|
if (terrno != 0) {
|
||||||
|
taosArrayDestroy(pDeleteReq->deleteReqs);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
const SArray* pBlocks = (const SArray*)data;
|
const SArray* pBlocks = (const SArray*)data;
|
||||||
|
|
Loading…
Reference in New Issue