refactor(stream): do sink module refactor.

This commit is contained in:
Haojun Liao 2023-09-11 10:09:31 +08:00
parent a9321248a9
commit 50ef500f51
3 changed files with 256 additions and 228 deletions

View File

@ -2139,22 +2139,16 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return TSDB_CODE_FAILED;
}
SSmlKv pTag = {.key = "group_id",
.keyLen = sizeof("group_id") - 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.u = groupId,
.length = sizeof(uint64_t)};
int8_t type = TSDB_DATA_TYPE_UBIGINT;
const char* name = "group_id";
int32_t len = strlen(name) - 1;
SSmlKv pTag = { .key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
taosArrayPush(tags, &pTag);
RandTableName rname = {
.tags = tags,
.stbFullName = stbFullName,
.stbFullNameLen = strlen(stbFullName),
.ctbShortName = cname,
};
.tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
buildChildTableName(&rname);
taosArrayDestroy(tags);
if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {

View File

@ -24,13 +24,13 @@ typedef struct STableSinkInfo {
tstr name;
} STableSinkInfo;
static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData);
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData);
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static void fillBucket(STokenBucket* pBucket);
static bool hasAvailableToken(STokenBucket* pBucket);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
@ -246,11 +246,12 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
return code;
}
static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) {
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
const char* id = pTask->id.idStr;
int32_t vgId = TD_VID(pVnode);
int32_t len = 0;
void* pBuf = NULL;
int32_t numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
@ -261,7 +262,8 @@ static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks);
tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
numOfFinalBlocks);
} else {
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
}
@ -279,6 +281,54 @@ static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32
return TSDB_CODE_SUCCESS;
}
// merge the new submit table block with the existed blocks
// if ts in the new data block overlap with existed one, replace it
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
int32_t newLen = taosArrayGetSize(pNew->aRowP);
int32_t j = 0, k = 0;
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
if (pFinal == NULL) {
tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
while (j < newLen && k < oldLen) {
SRow* pNewRow = taosArrayGetP(pNew->aRowP, j);
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
if (pNewRow->ts <= pOldRow->ts) {
taosArrayPush(pFinal, &pNewRow);
j += 1;
if (pNewRow->ts == pOldRow->ts) {
k += 1;
}
} else {
taosArrayPush(pFinal, &pOldRow);
k += 1;
}
}
while (j < newLen) {
SRow* pRow = taosArrayGetP(pNew->aRowP, j++);
taosArrayPush(pFinal, &pRow);
}
while (k < oldLen) {
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
taosArrayPush(pFinal, &pRow);
}
taosArrayDestroy(pNew->aRowP);
taosArrayDestroy(pExisted->aRowP);
pExisted->aRowP = pFinal;
tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL));
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@ -294,16 +344,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
pTask->tsInfo.sinkStart = taosGetTimestampMs();
}
bool isMixBlocks = true;
bool onlySubmitData = true;
for(int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
isMixBlocks = true;
onlySubmitData = false;
break;
}
}
if (isMixBlocks) {
if (!onlySubmitData) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks);
@ -326,10 +376,18 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
}
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq);
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
}
}
} else {
@ -345,87 +403,59 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
}
bool hasSubmit = false;
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else {
}
hasSubmit = true;
pTask->sinkRecorder.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid));
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
if (index == NULL) { // no data yet, append it
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size));
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
} else {
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
// merge the new submit table block with the existed blocks
// if ts in the new data block overlap with existed one, replace it
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
int32_t newLen = taosArrayGetSize(tbData.aRowP);
int32_t j = 0, k = 0;
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
while (j < newLen && k < oldLen) {
SRow* pNewRow = taosArrayGetP(tbData.aRowP, j);
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
if (pNewRow->ts <= pOldRow->ts) {
taosArrayPush(pFinal, &pNewRow);
if (pNewRow->ts < pOldRow->ts) {
j += 1;
} else {
j += 1;
k += 1;
code = doMergeExistedRows(pExisted, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
} else {
taosArrayPush(pFinal, &pOldRow);
k += 1;
}
}
while (j < newLen) {
SRow* pRow = taosArrayGetP(tbData.aRowP, j++);
taosArrayPush(pFinal, &pRow);
}
while (k < oldLen) {
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
taosArrayPush(pFinal, &pRow);
}
taosArrayDestroy(tbData.aRowP);
taosArrayDestroy(pExisted->aRowP);
pExisted->aRowP = pFinal;
tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL));
}
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
}
}
taosHashCleanup(pTableIndexMap);
if (hasSubmit) {
doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq);
doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
} else {
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
}
}
// TODO: change
}
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
@ -605,153 +635,21 @@ static int32_t tsAscendingSortFn(const void* p1, const void* p2) {
}
}
int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) {
static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t vgId = TD_VID(pVnode);
uint64_t groupId = pDataBlock->info.id.groupId;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t code = TSDB_CODE_SUCCESS;
SArray* pVals = NULL;
const char* id = pTask->id.idStr;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, suid);
SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (pTableData->aRowP == NULL || pVals == NULL) {
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s vgId:%d failed to prepare write stream res blocks, code:%s", id, vgId, tstrerror(code));
tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
return code;
}
STableSinkInfo* pTableSinkInfo = NULL;
bool exist = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
char* dstTableName = pDataBlock->info.parTbName;
if (exist) {
if (dstTableName[0] == 0) {
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
dstTableName);
} else {
if (pTableSinkInfo->uid != 0) {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
dstTableName, pTableSinkInfo->uid);
} else {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
id, numOfRows, groupId, dstTableName);
}
}
} else { // not exist
if (dstTableName[0] == 0) {
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}
int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
}
if (exist) {
pTableData->uid = pTableSinkInfo->uid;
if (pTableData->uid == 0) {
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
}
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
}
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
code = metaGetTableEntryByName(&mr, dstTableName);
if (code == 0) { // table alreay exists, check its type and uid
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) { // not valid table, ignore it
metaReaderClear(&mr);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
pTableData->uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr);
}
} else { // not exist, wait and retry
metaReaderClear(&mr);
taosMsleep(100);
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
}
}
} else {
// todo: this check is not safe, and results in losing of submit message from WAL.
// The auto-create option will always set to be open for those submit messages, which arrive during the period
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
// randomly generated false table uid in the WAL.
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
// table not in cache, let's try the extract it from tsdb meta
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
metaReaderClear(&mr);
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return terrno;
}
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id);
} else {
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
} else {
pTableData->uid = mr.me.uid;
metaReaderClear(&mr);
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id);
}
}
}
// rows
for (int32_t j = 0; j < numOfRows; j++) {
taosArrayClear(pVals);
@ -795,7 +693,6 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return code;
@ -805,9 +702,147 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
taosArrayPush(pTableData->aRowP, &pRow);
}
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
}
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData) {
uint64_t groupId = pDataBlock->info.id.groupId;
char* dstTableName = pDataBlock->info.parTbName;
int32_t numOfRows = pDataBlock->info.rows;
const char* id = pTask->id.idStr;
int64_t suid = pTask->tbSink.stbUid;
STSchema* pTSchema = pTask->tbSink.pTSchema;
int32_t vgId = TD_VID(pVnode);
STableSinkInfo* pTableSinkInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
if (alreadyCached) {
if (dstTableName[0] == 0) { // data block does not set the destination table name
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
dstTableName);
} else {
if (pTableSinkInfo->uid != 0) {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
dstTableName, pTableSinkInfo->uid);
} else {
tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
id, numOfRows, groupId, dstTableName);
}
}
} else { // this groupId has not been kept in cache yet
if (dstTableName[0] == 0) {
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
}
int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
}
if (alreadyCached) {
pTableData->uid = pTableSinkInfo->uid;
if (pTableData->uid == 0) {
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
}
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(&pTask->status)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
return TSDB_CODE_SUCCESS;
}
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
if (code == 0) { // table already exists, check its type and uid
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) { // not valid table, ignore it
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
} else {
tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
pTableData->uid = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid;
metaReaderClear(&mr);
}
} else { // not exist, wait and retry
metaReaderClear(&mr);
taosMsleep(100);
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
}
}
} else {
// todo: this check is not safe, and results in losing of submit message from WAL.
// The auto-create option will always set to be open for those submit messages, which arrive during the period
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
// randomly generated false table uid in the WAL.
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
// table not in cache, let's try the extract it from tsdb meta
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
metaReaderClear(&mr);
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
if (pTableData->pCreateTbReq == NULL) {
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
return terrno;
}
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id);
} else {
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
if (!isValid) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
dstTableName);
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
} else {
pTableData->uid = mr.me.uid;
metaReaderClear(&mr);
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData) {
int32_t numOfRows = pDataBlock->info.rows;
const char* id = pTask->id.idStr;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, pTask->tbSink.stbUid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
int32_t code = doConvertRows(pTableData, pTask->tbSink.pTSchema, pDataBlock, id);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
return code;
}
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
return code;
}

View File

@ -365,7 +365,6 @@ static void fillBucket(STokenBucket* pBucket) {
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
fillBucket(pBucket);
if (pBucket->numOfToken > 0) {
// qDebug("current token:%d", pBucket->numOfToken);
--pBucket->numOfToken;
return true;
} else {