diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f7bb6f85e2..b5239c14b5 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2139,22 +2139,16 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha return TSDB_CODE_FAILED; } - SSmlKv pTag = {.key = "group_id", - .keyLen = sizeof("group_id") - 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .u = groupId, - .length = sizeof(uint64_t)}; + int8_t type = TSDB_DATA_TYPE_UBIGINT; + const char* name = "group_id"; + int32_t len = strlen(name) - 1; + SSmlKv pTag = { .key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)}; taosArrayPush(tags, &pTag); RandTableName rname = { - .tags = tags, - .stbFullName = stbFullName, - .stbFullNameLen = strlen(stbFullName), - .ctbShortName = cname, - }; + .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname}; buildChildTableName(&rname); - taosArrayDestroy(tags); if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 88bdc8a4d9..c84ab831bd 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,13 +24,13 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; -static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, - SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); +static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData); static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); +static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, + SSubmitTbData* pTableData); static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); -static void fillBucket(STokenBucket* pBucket); -static bool hasAvailableToken(STokenBucket* pBucket); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -246,11 +246,12 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S return code; } -static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) { +static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) { const char* id = pTask->id.idStr; int32_t vgId = TD_VID(pVnode); int32_t len = 0; void* pBuf = NULL; + int32_t numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData); int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len); if (code != TSDB_CODE_SUCCESS) { @@ -261,7 +262,8 @@ static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32 SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); if (code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); + tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks, + numOfFinalBlocks); } else { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } @@ -272,13 +274,61 @@ static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32 SSinkTaskRecorder* pRec = &pTask->sinkRecorder; double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 - " submit into dst table, duration:%.2f Sec.", + " submit into dst table, duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); } return TSDB_CODE_SUCCESS; } +// merge the new submit table block with the existed blocks +// if ts in the new data block overlap with existed one, replace it +static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) { + int32_t oldLen = taosArrayGetSize(pExisted->aRowP); + int32_t newLen = taosArrayGetSize(pNew->aRowP); + + int32_t j = 0, k = 0; + SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); + if (pFinal == NULL) { + tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + while (j < newLen && k < oldLen) { + SRow* pNewRow = taosArrayGetP(pNew->aRowP, j); + SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); + if (pNewRow->ts <= pOldRow->ts) { + taosArrayPush(pFinal, &pNewRow); + j += 1; + + if (pNewRow->ts == pOldRow->ts) { + k += 1; + } + } else { + taosArrayPush(pFinal, &pOldRow); + k += 1; + } + } + + while (j < newLen) { + SRow* pRow = taosArrayGetP(pNew->aRowP, j++); + taosArrayPush(pFinal, &pRow); + } + + while (k < oldLen) { + SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + taosArrayPush(pFinal, &pRow); + } + + taosArrayDestroy(pNew->aRowP); + taosArrayDestroy(pExisted->aRowP); + pExisted->aRowP = pFinal; + + tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, + (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL)); + return TSDB_CODE_SUCCESS; +} + void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -294,16 +344,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { pTask->tsInfo.sinkStart = taosGetTimestampMs(); } - bool isMixBlocks = true; + bool onlySubmitData = true; for(int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* p = taosArrayGet(pBlocks, i); if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { - isMixBlocks = true; + onlySubmitData = false; break; } } - if (isMixBlocks) { + if (!onlySubmitData) { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, numOfBlocks); @@ -326,10 +376,18 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); - taosArrayPush(submitReq.aSubmitTbData, &tbData); + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } - code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq); + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1); } } } else { @@ -345,91 +403,63 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } bool hasSubmit = false; - for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; - } else { - hasSubmit = true; - pTask->sinkRecorder.numOfBlocks += 1; + } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + hasSubmit = true; + pTask->sinkRecorder.numOfBlocks += 1; + uint64_t groupId = pDataBlock->info.id.groupId; - int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid)); - if (index == NULL) { // no data yet, append it - taosArrayPush(submitReq.aSubmitTbData, &tbData); + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; - taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size)); - } else { - SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); - // merge the new submit table block with the existed blocks - // if ts in the new data block overlap with existed one, replace it - int32_t oldLen = taosArrayGetSize(pExisted->aRowP); - int32_t newLen = taosArrayGetSize(tbData.aRowP); - - int32_t j = 0, k = 0; - SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); - while (j < newLen && k < oldLen) { - SRow* pNewRow = taosArrayGetP(tbData.aRowP, j); - SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); - if (pNewRow->ts <= pOldRow->ts) { - taosArrayPush(pFinal, &pNewRow); - if (pNewRow->ts < pOldRow->ts) { - j += 1; - } else { - j += 1; - k += 1; - } - } else { - taosArrayPush(pFinal, &pOldRow); - k += 1; - } - } - - while (j < newLen) { - SRow* pRow = taosArrayGetP(tbData.aRowP, j++); - taosArrayPush(pFinal, &pRow); - } - - while (k < oldLen) { - SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); - taosArrayPush(pFinal, &pRow); - } - - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pExisted->aRowP); - pExisted->aRowP = pFinal; - - tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, - (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL)); + int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); + if (index == NULL) { // no data yet, append it + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; } - pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + + int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; + taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); + } else { + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + code = doMergeExistedRows(pExisted, &tbData, id); + if (code != TSDB_CODE_SUCCESS) { + continue; + } } + + pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; } taosHashCleanup(pTableIndexMap); if (hasSubmit) { - doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq); + doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks); } else { tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); } } - - // TODO: change } int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, - int64_t suid) { + int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); @@ -605,153 +635,21 @@ static int32_t tsAscendingSortFn(const void* p1, const void* p2) { } } -int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, - SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) { - int32_t numOfRows = pDataBlock->info.rows; - int32_t vgId = TD_VID(pVnode); - uint64_t groupId = pDataBlock->info.id.groupId; - STSchema* pTSchema = pTask->tbSink.pTSchema; - int32_t code = TSDB_CODE_SUCCESS; - SArray* pVals = NULL; - const char* id = pTask->id.idStr; - - tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, - id, blockIndex + 1, numOfRows, suid); +static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { + int32_t numOfRows = pDataBlock->info.rows; + int32_t code = TSDB_CODE_SUCCESS; + SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); - pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); if (pTableData->aRowP == NULL || pVals == NULL) { pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); - code = TSDB_CODE_OUT_OF_MEMORY; - tqError("s-task:%s vgId:%d failed to prepare write stream res blocks, code:%s", id, vgId, tstrerror(code)); + tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code)); return code; } - STableSinkInfo* pTableSinkInfo = NULL; - bool exist = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo); - - char* dstTableName = pDataBlock->info.parTbName; - if (exist) { - if (dstTableName[0] == 0) { - tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1); - tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId, - dstTableName); - } else { - if (pTableSinkInfo->uid != 0) { - tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId, - dstTableName, pTableSinkInfo->uid); - } else { - tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)", - id, numOfRows, groupId, dstTableName); - } - } - } else { // not exist - if (dstTableName[0] == 0) { - memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); - buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); - } - - int32_t nameLen = strlen(dstTableName); - pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); - - pTableSinkInfo->name.len = nameLen; - memcpy(pTableSinkInfo->name.data, dstTableName, nameLen); - tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); - } - - if (exist) { - pTableData->uid = pTableSinkInfo->uid; - - if (pTableData->uid == 0) { - tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); - } - - while (pTableSinkInfo->uid == 0) { - if (streamTaskShouldStop(&pTask->status)) { - tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); - taosArrayDestroy(pVals); - - return TSDB_CODE_SUCCESS; - } - - // wait for the table to be created - SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); - - code = metaGetTableEntryByName(&mr, dstTableName); - if (code == 0) { // table alreay exists, check its type and uid - bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); - if (!isValid) { // not valid table, ignore it - metaReaderClear(&mr); - - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); - taosArrayDestroy(pVals); - - return TSDB_CODE_SUCCESS; - } else { - tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); - - pTableData->uid = mr.me.uid; - pTableSinkInfo->uid = mr.me.uid; - metaReaderClear(&mr); - } - } else { // not exist, wait and retry - metaReaderClear(&mr); - taosMsleep(100); - tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); - } - } - - } else { - // todo: this check is not safe, and results in losing of submit message from WAL. - // The auto-create option will always set to be open for those submit messages, which arrive during the period - // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning - // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for - // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have - // randomly generated false table uid in the WAL. - SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); - - // table not in cache, let's try the extract it from tsdb meta - if (metaGetTableEntryByName(&mr, dstTableName) < 0) { - metaReaderClear(&mr); - - tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); - - pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); - if (pTableData->pCreateTbReq == NULL) { - tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); - taosArrayDestroy(pVals); - - return terrno; - } - - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id); - } else { - bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); - if (!isValid) { - metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); - taosArrayDestroy(pVals); - return TSDB_CODE_SUCCESS; - } else { - pTableData->uid = mr.me.uid; - metaReaderClear(&mr); - - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id); - } - } - } - - // rows for (int32_t j = 0; j < numOfRows; j++) { taosArrayClear(pVals); @@ -795,7 +693,6 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return code; @@ -805,9 +702,147 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF taosArrayPush(pTableData->aRowP, &pRow); } + taosArrayDestroy(pVals); + return TSDB_CODE_SUCCESS; +} + +int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, + SSubmitTbData* pTableData) { + uint64_t groupId = pDataBlock->info.id.groupId; + char* dstTableName = pDataBlock->info.parTbName; + int32_t numOfRows = pDataBlock->info.rows; + const char* id = pTask->id.idStr; + int64_t suid = pTask->tbSink.stbUid; + STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t vgId = TD_VID(pVnode); + STableSinkInfo* pTableSinkInfo = NULL; + + bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo); + if (alreadyCached) { + if (dstTableName[0] == 0) { // data block does not set the destination table name + tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1); + tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId, + dstTableName); + } else { + if (pTableSinkInfo->uid != 0) { + tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId, + dstTableName, pTableSinkInfo->uid); + } else { + tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)", + id, numOfRows, groupId, dstTableName); + } + } + } else { // this groupId has not been kept in cache yet + if (dstTableName[0] == 0) { + memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); + buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); + } + + int32_t nameLen = strlen(dstTableName); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); + + pTableSinkInfo->name.len = nameLen; + memcpy(pTableSinkInfo->name.data, dstTableName, nameLen); + tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); + } + + if (alreadyCached) { + pTableData->uid = pTableSinkInfo->uid; + + if (pTableData->uid == 0) { + tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); + } + + while (pTableSinkInfo->uid == 0) { + if (streamTaskShouldStop(&pTask->status)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); + return TSDB_CODE_SUCCESS; + } + + // wait for the table to be created + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, 0); + + int32_t code = metaGetTableEntryByName(&mr, dstTableName); + if (code == 0) { // table already exists, check its type and uid + bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); + if (!isValid) { // not valid table, ignore it + metaReaderClear(&mr); + return TSDB_CODE_SUCCESS; + } else { + tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); + + pTableData->uid = mr.me.uid; + pTableSinkInfo->uid = mr.me.uid; + metaReaderClear(&mr); + } + } else { // not exist, wait and retry + metaReaderClear(&mr); + taosMsleep(100); + tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); + } + } + + } else { + // todo: this check is not safe, and results in losing of submit message from WAL. + // The auto-create option will always set to be open for those submit messages, which arrive during the period + // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning + // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for + // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have + // randomly generated false table uid in the WAL. + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, 0); + + // table not in cache, let's try the extract it from tsdb meta + if (metaGetTableEntryByName(&mr, dstTableName) < 0) { + metaReaderClear(&mr); + + tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); + + pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); + if (pTableData->pCreateTbReq == NULL) { + tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); + return terrno; + } + + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id); + } else { + bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); + if (!isValid) { + metaReaderClear(&mr); + taosMemoryFree(pTableSinkInfo); + tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, + dstTableName); + return TSDB_CODE_PAR_DUPLICATED_COLUMN; + } else { + pTableData->uid = mr.me.uid; + metaReaderClear(&mr); + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id); + } + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData) { + int32_t numOfRows = pDataBlock->info.rows; + const char* id = pTask->id.idStr; + + tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, + id, blockIndex + 1, numOfRows, pTask->tbSink.stbUid); + char* dstTableName = pDataBlock->info.parTbName; + + // convert all rows + int32_t code = doConvertRows(pTableData, pTask->tbSink.pTSchema, pDataBlock, id); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno)); + return code; + } + taosArraySort(pTableData->aRowP, tsAscendingSortFn); tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); - - taosArrayDestroy(pVals); return code; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index a667ec2371..62873f37bc 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -365,7 +365,6 @@ static void fillBucket(STokenBucket* pBucket) { bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); if (pBucket->numOfToken > 0) { -// qDebug("current token:%d", pBucket->numOfToken); --pBucket->numOfToken; return true; } else {