fix(stream): check dst stable schema before generating auto create table.
This commit is contained in:
parent
8dde576c6f
commit
691c1dfa7d
|
@ -237,6 +237,8 @@ typedef struct {
|
|||
int64_t stbUid;
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
SSchemaWrapper* pSchemaWrapper;
|
||||
SSchemaWrapper* pTagSchema;
|
||||
bool autoCreateCtb;
|
||||
void* vnode; // not available to encoder and decoder
|
||||
FTbSink* tbSinkFunc;
|
||||
STSchema* pTSchema;
|
||||
|
|
|
@ -725,24 +725,33 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
|
||||
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
|
||||
if (pTask->outputInfo.tbSink.autoCreateCtb) {
|
||||
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
|
||||
|
||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||
|
||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
pTableData->pCreateTbReq =
|
||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
taosArrayDestroy(pTagArray);
|
||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
pTableData->pCreateTbReq =
|
||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
taosArrayDestroy(pTagArray);
|
||||
|
||||
if (pTableData->pCreateTbReq == NULL) {
|
||||
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
||||
taosMemoryFree(pTableSinkInfo);
|
||||
return terrno;
|
||||
if (pTableData->pCreateTbReq == NULL) {
|
||||
tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName,
|
||||
tstrerror(terrno));
|
||||
taosMemoryFree(pTableSinkInfo);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pTableSinkInfo->uid = 0;
|
||||
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
} else {
|
||||
metaReaderClear(&mr);
|
||||
|
||||
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id,
|
||||
vgId, dstTableName);
|
||||
return TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
}
|
||||
|
||||
pTableSinkInfo->uid = 0;
|
||||
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
} else {
|
||||
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||
if (!isValid) {
|
||||
|
@ -788,16 +797,34 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
|
|||
}
|
||||
|
||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||
|
||||
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
||||
SMetaReader mer1 = {0};
|
||||
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
||||
|
||||
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
||||
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
||||
metaReaderClear(&mer1);
|
||||
|
||||
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
||||
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
||||
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
||||
pOutputInfo->tbSink.autoCreateCtb = true;
|
||||
} else {
|
||||
pOutputInfo->tbSink.autoCreateCtb = false;
|
||||
}
|
||||
}
|
||||
|
||||
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
||||
if (!onlySubmitData) {
|
||||
|
@ -829,6 +856,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d s-task:%s dst-table not exist, stb:%" PRId64 " discard stream results", vgId, id,
|
||||
stbFullName);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -882,6 +911,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
if (index == NULL) { // no data yet, append it
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -263,6 +263,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue