Merge pull request #26603 from taosdata/fix/create_tb
fix(stream): check dst stable schema before create submit msg
This commit is contained in:
commit
9b3a9d52fb
|
@ -237,6 +237,8 @@ typedef struct {
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
SSchemaWrapper* pSchemaWrapper;
|
SSchemaWrapper* pSchemaWrapper;
|
||||||
|
SSchemaWrapper* pTagSchema;
|
||||||
|
bool autoCreateCtb;
|
||||||
void* vnode; // not available to encoder and decoder
|
void* vnode; // not available to encoder and decoder
|
||||||
FTbSink* tbSinkFunc;
|
FTbSink* tbSinkFunc;
|
||||||
STSchema* pTSchema;
|
STSchema* pTSchema;
|
||||||
|
|
|
@ -725,24 +725,33 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
|
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
|
if (pTask->outputInfo.tbSink.autoCreateCtb) {
|
||||||
|
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
|
||||||
|
|
||||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||||
|
|
||||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
pTableData->pCreateTbReq =
|
pTableData->pCreateTbReq =
|
||||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||||
taosArrayDestroy(pTagArray);
|
taosArrayDestroy(pTagArray);
|
||||||
|
|
||||||
if (pTableData->pCreateTbReq == NULL) {
|
if (pTableData->pCreateTbReq == NULL) {
|
||||||
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName,
|
||||||
taosMemoryFree(pTableSinkInfo);
|
tstrerror(terrno));
|
||||||
return terrno;
|
taosMemoryFree(pTableSinkInfo);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableSinkInfo->uid = 0;
|
||||||
|
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||||
|
} else {
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id,
|
||||||
|
vgId, dstTableName);
|
||||||
|
return TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableSinkInfo->uid = 0;
|
|
||||||
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
|
||||||
} else {
|
} else {
|
||||||
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||||
if (!isValid) {
|
if (!isValid) {
|
||||||
|
@ -788,16 +797,34 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
const SArray* pBlocks = (const SArray*)data;
|
const SArray* pBlocks = (const SArray*)data;
|
||||||
SVnode* pVnode = (SVnode*)vnode;
|
SVnode* pVnode = (SVnode*)vnode;
|
||||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
||||||
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
|
|
||||||
|
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
||||||
|
SMetaReader mer1 = {0};
|
||||||
|
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
||||||
|
|
||||||
|
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
||||||
|
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
||||||
|
metaReaderClear(&mer1);
|
||||||
|
|
||||||
|
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
||||||
|
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
||||||
|
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
||||||
|
pOutputInfo->tbSink.autoCreateCtb = true;
|
||||||
|
} else {
|
||||||
|
pOutputInfo->tbSink.autoCreateCtb = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
||||||
if (!onlySubmitData) {
|
if (!onlySubmitData) {
|
||||||
|
@ -829,6 +856,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -882,6 +910,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
if (index == NULL) { // no data yet, append it
|
if (index == NULL) { // no data yet, append it
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1162,10 +1162,10 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) {
|
||||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||||
|
|
||||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
|
||||||
clearBufferedDispatchMsg(pTask);
|
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||||
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
|
|
||||||
|
clearBufferedDispatchMsg(pTask);
|
||||||
|
|
||||||
// put data into inputQ of current task is also allowed
|
// put data into inputQ of current task is also allowed
|
||||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
@ -1189,13 +1189,24 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
|
static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) {
|
||||||
int32_t numOfRsp = 0;
|
int32_t numOfRsp = 0;
|
||||||
bool alreadySet = false;
|
bool alreadySet = false;
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
bool allRsp = false;
|
||||||
|
*pNotRsp = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pMsgInfo->lock);
|
taosThreadMutexLock(&pMsgInfo->lock);
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfDispatchBranch; ++i) {
|
||||||
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
|
||||||
|
if (pEntry->rspTs != -1) {
|
||||||
|
numOfRsp += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
||||||
if (pEntry->nodeId == vgId) {
|
if (pEntry->nodeId == vgId) {
|
||||||
ASSERT(!alreadySet);
|
ASSERT(!alreadySet);
|
||||||
|
@ -1203,18 +1214,20 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
|
||||||
pEntry->status = code;
|
pEntry->status = code;
|
||||||
alreadySet = true;
|
alreadySet = true;
|
||||||
updated = true;
|
updated = true;
|
||||||
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d", id, now, code, j);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->rspTs != -1) {
|
|
||||||
numOfRsp += 1;
|
numOfRsp += 1;
|
||||||
|
|
||||||
|
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j,
|
||||||
|
numOfRsp, numOfDispatchBranch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
*pNotRsp = numOfDispatchBranch - numOfRsp;
|
||||||
ASSERT(updated);
|
allRsp = (numOfRsp == numOfDispatchBranch);
|
||||||
|
|
||||||
return numOfRsp;
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
|
|
||||||
|
ASSERT(updated);
|
||||||
|
return allRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
|
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
|
||||||
|
@ -1240,7 +1253,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int32_t totalRsp = 0;
|
bool allRsp = false;
|
||||||
|
int32_t notRsp = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pMsgInfo->lock);
|
taosThreadMutexLock(&pMsgInfo->lock);
|
||||||
int32_t msgId = pMsgInfo->msgId;
|
int32_t msgId = pMsgInfo->msgId;
|
||||||
|
@ -1269,18 +1283,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
||||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // code == 0
|
} else { // code == 0
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
// block the input of current task, to push pressure to upstream
|
// block the input of current task, to push pressure to upstream
|
||||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id);
|
||||||
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1292,7 +1306,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
||||||
|
|
||||||
{
|
{
|
||||||
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
|
@ -1317,13 +1331,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp;
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
if (notRsp > 0) {
|
if (!allRsp) {
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, "
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, "
|
||||||
"waiting "
|
"waiting for %d rsp",
|
||||||
"for %d rsp",
|
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
|
||||||
} else {
|
} else {
|
||||||
stDebug(
|
stDebug(
|
||||||
|
@ -1337,7 +1349,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
|
|
||||||
// all msg rsp already, continue
|
// all msg rsp already, continue
|
||||||
if (notRsp == 0) {
|
if (allRsp) {
|
||||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
// we need to re-try send dispatch msg to downstream tasks
|
// we need to re-try send dispatch msg to downstream tasks
|
||||||
|
|
|
@ -263,6 +263,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
||||||
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
||||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
||||||
|
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue