add uid check

This commit is contained in:
Liu Jicong 2022-12-08 15:56:17 +08:00
parent a2092892f7
commit 3d9cc76489
1 changed files with 62 additions and 40 deletions

View File

@ -755,7 +755,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int64_t suid = pTask->tbSink.stbUid; int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName; char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema; STSchema* pTSchema = pTask->tbSink.pTSchema;
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; /*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/
int32_t blockSz = taosArrayGetSize(pBlocks); int32_t blockSz = taosArrayGetSize(pBlocks);
@ -780,7 +780,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} }
for (int32_t i = 0; i < blockSz; i++) { for (int32_t i = 0; i < blockSz; i++) {
bool createTb = true;
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0}; SBatchDeleteReq deleteReq = {0};
@ -818,13 +817,31 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
} else { } else {
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); SSubmitTbData tbData = {0};
if (!pTbData) { int32_t rows = pDataBlock->info.rows;
terrno = TSDB_CODE_OUT_OF_MEMORY; tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end; goto _end;
} }
// create table req
if (createTb) { tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version;
char* ctbName = NULL;
if (pDataBlock->info.parTbName[0]) {
ctbName = strdup(pDataBlock->info.parTbName);
} else {
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
@ -867,31 +884,36 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->ctb.tagName = tagName; pCreateTbReq->ctb.tagName = tagName;
// set table name // set table name
if (pDataBlock->info.parTbName[0]) { pCreateTbReq->name = ctbName;
pCreateTbReq->name = strdup(pDataBlock->info.parTbName); ctbName = NULL;
tbData.pCreateTbReq = pCreateTbReq;
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
} else { } else {
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); if (mr.me.type != TSDB_CHILD_TABLE) {
} tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
pTbData->pCreateTbReq = pCreateTbReq; mr.me.type);
pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
} }
pTbData->suid = suid; if (mr.me.ctbEntry.suid != suid) {
pTbData->uid = 0; // uid is assigned by vnode tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
pTbData->sver = pTSchema->version; ", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
int32_t rows = pDataBlock->info.rows; metaReaderClear(&mr);
taosMemoryFree(ctbName);
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
taosMemoryFreeClear(pTbData);
goto _end;
} }
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); tbData.uid = mr.me.uid;
metaReaderClear(&mr);
taosMemoryFreeClear(ctbName);
}
// rows
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP); taosArrayDestroy(tbData.aRowP);
taosMemoryFree(pTbData);
goto _end; goto _end;
} }
@ -904,14 +926,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {
void* data = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) { if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value SValue sv =
(SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {
SValue sv; SValue sv;
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} }
@ -919,15 +942,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} }
SRow* pRow = NULL; SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
ASSERT(pRow); ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
} }
taosArrayPush(pReq->aSubmitTbData, pTbData); taosArrayPush(pReq->aSubmitTbData, &tbData);
taosMemoryFree(pTbData);
// encode // encode
int32_t len; int32_t len;