delete invalid code
This commit is contained in:
parent
f236457904
commit
524bb214e7
|
@ -71,257 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|
||||||
const SArray* pBlocks = (const SArray*)data;
|
|
||||||
SVnode* pVnode = (SVnode*)vnode;
|
|
||||||
int64_t suid = pTask->tbSink.stbUid;
|
|
||||||
char* stbFullName = pTask->tbSink.stbFullName;
|
|
||||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
|
||||||
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;
|
|
||||||
|
|
||||||
int32_t blockSz = taosArrayGetSize(pBlocks);
|
|
||||||
|
|
||||||
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
|
|
||||||
if (!tagArray) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
|
|
||||||
for (int32_t i = 0; i < blockSz; i++) {
|
|
||||||
bool createTb = true;
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
|
||||||
SBatchDeleteReq deleteReq = {0};
|
|
||||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
|
||||||
deleteReq.suid = suid;
|
|
||||||
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
|
|
||||||
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
|
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len;
|
|
||||||
int32_t code;
|
|
||||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
|
||||||
if (code < 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SEncoder encoder;
|
|
||||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
|
||||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
|
||||||
tEncoderInit(&encoder, abuf, len);
|
|
||||||
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
|
||||||
|
|
||||||
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
|
|
||||||
|
|
||||||
SRpcMsg msg = {
|
|
||||||
.msgType = TDMT_VND_BATCH_DEL,
|
|
||||||
.pCont = serializedDeleteReq,
|
|
||||||
.contLen = len + sizeof(SMsgHead),
|
|
||||||
};
|
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
|
||||||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
char* ctbName = NULL;
|
|
||||||
// set child table name
|
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
|
||||||
ctbName = taosStrdup(pDataBlock->info.parTbName);
|
|
||||||
} else {
|
|
||||||
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t schemaLen = 0;
|
|
||||||
void* schemaStr = NULL;
|
|
||||||
|
|
||||||
int64_t uid = 0;
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
|
||||||
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
|
||||||
|
|
||||||
SVCreateTbReq createTbReq = {0};
|
|
||||||
|
|
||||||
// set const
|
|
||||||
createTbReq.flags = 0;
|
|
||||||
createTbReq.type = TSDB_CHILD_TABLE;
|
|
||||||
createTbReq.ctb.suid = suid;
|
|
||||||
|
|
||||||
// set super table name
|
|
||||||
SName name = {0};
|
|
||||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
|
||||||
createTbReq.ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
|
|
||||||
createTbReq.name = ctbName;
|
|
||||||
ctbName = NULL;
|
|
||||||
|
|
||||||
// set tag content
|
|
||||||
taosArrayClear(tagArray);
|
|
||||||
STagVal tagVal = {
|
|
||||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
|
||||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
|
||||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
|
||||||
};
|
|
||||||
taosArrayPush(tagArray, &tagVal);
|
|
||||||
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
|
|
||||||
|
|
||||||
STag* pTag = NULL;
|
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
|
||||||
if (pTag == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosArrayDestroy(tagArray);
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
|
||||||
|
|
||||||
// set tag name
|
|
||||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
|
||||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
|
||||||
strcpy(tagNameStr, "group_id");
|
|
||||||
taosArrayPush(tagName, tagNameStr);
|
|
||||||
createTbReq.ctb.tagName = tagName;
|
|
||||||
|
|
||||||
int32_t code;
|
|
||||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
|
||||||
if (code < 0) {
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
taosArrayDestroy(tagArray);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set schema str
|
|
||||||
schemaStr = taosMemoryMalloc(schemaLen);
|
|
||||||
if (schemaStr == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
taosArrayDestroy(tagArray);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
|
||||||
tEncoderInit(&encoder, schemaStr, schemaLen);
|
|
||||||
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
|
||||||
if (code < 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
taosArrayDestroy(tagArray);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosMemoryFree(schemaStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
} else {
|
|
||||||
if (mr.me.type != TSDB_CHILD_TABLE) {
|
|
||||||
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
|
||||||
mr.me.type);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
taosMemoryFree(ctbName);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (mr.me.ctbEntry.suid != suid) {
|
|
||||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
|
|
||||||
", actual suid %" PRId64 "",
|
|
||||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
taosMemoryFree(ctbName);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
createTb = false;
|
|
||||||
uid = mr.me.uid;
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
|
|
||||||
tqDebug("vgId:%d, stream write, table %s, uid %" PRId64 " already exist, skip create", TD_VID(pVnode), ctbName,
|
|
||||||
uid);
|
|
||||||
|
|
||||||
taosMemoryFreeClear(ctbName);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t cap = sizeof(SSubmitReq);
|
|
||||||
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
|
||||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
|
||||||
|
|
||||||
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
|
|
||||||
|
|
||||||
SSubmitReq* pSubmit = rpcMallocCont(cap);
|
|
||||||
pSubmit->header.vgId = pVnode->config.vgId;
|
|
||||||
pSubmit->length = sizeof(SSubmitReq);
|
|
||||||
pSubmit->numOfBlocks = htonl(1);
|
|
||||||
|
|
||||||
SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
|
|
||||||
|
|
||||||
blkHead->numOfRows = htonl(pDataBlock->info.rows);
|
|
||||||
blkHead->sversion = htonl(pTSchema->version);
|
|
||||||
blkHead->suid = htobe64(suid);
|
|
||||||
// uid is assigned by vnode
|
|
||||||
blkHead->uid = 0;
|
|
||||||
blkHead->schemaLen = 0;
|
|
||||||
|
|
||||||
tqDebug("tq sink pipe1, convert block2 %d, rows: %d", i, rows);
|
|
||||||
|
|
||||||
int32_t dataLen = 0;
|
|
||||||
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
|
||||||
STSRow* rowData = blkSchema;
|
|
||||||
if (createTb) {
|
|
||||||
memcpy(blkSchema, schemaStr, schemaLen);
|
|
||||||
blkHead->schemaLen = htonl(schemaLen);
|
|
||||||
rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
|
||||||
} else {
|
|
||||||
blkHead->uid = htobe64(uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(schemaStr);
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
|
||||||
SRowBuilder rb = {0};
|
|
||||||
tdSRowInit(&rb, pTSchema->version);
|
|
||||||
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
|
||||||
tdSRowResetBuf(&rb, rowData);
|
|
||||||
|
|
||||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
|
||||||
const STColumn* pColumn = &pTSchema->columns[k];
|
|
||||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
|
||||||
if (colDataIsNull_s(pColData, j)) {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
|
||||||
} else {
|
|
||||||
void* colData = colDataGetData(pColData, j);
|
|
||||||
if (k == 0) {
|
|
||||||
tqDebug("tq sink pipe1, row %d ts %" PRId64, j, *(int64_t*)colData);
|
|
||||||
}
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tdSRowEnd(&rb);
|
|
||||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
|
||||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
|
||||||
dataLen += rowLen;
|
|
||||||
}
|
|
||||||
blkHead->dataLen = htonl(dataLen);
|
|
||||||
|
|
||||||
pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
|
|
||||||
pSubmit->length = htonl(pSubmit->length);
|
|
||||||
|
|
||||||
SRpcMsg msg = {
|
|
||||||
.msgType = TDMT_VND_SUBMIT,
|
|
||||||
.pCont = pSubmit,
|
|
||||||
.contLen = ntohl(pSubmit->length),
|
|
||||||
};
|
|
||||||
|
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
|
||||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosArrayDestroy(tagArray);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
|
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue