fix(stream):check error when vnode is closing.
This commit is contained in:
parent
f602aa965f
commit
93159729f6
|
@ -788,7 +788,7 @@ int32_t* taosGetErrno();
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
#define TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE TAOS_DEF_ERROR_CODE(0, 0x4101)
|
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -155,7 +155,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
||||||
// tqSink
|
// tqSink
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr);
|
const char* pIdStr);
|
||||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data);
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
|
||||||
|
|
||||||
// tqOffset
|
// tqOffset
|
||||||
char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||||
|
|
|
@ -798,7 +798,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->smaSink.smaSink = smaHandleRes;
|
pTask->smaSink.smaSink = smaHandleRes;
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
|
pTask->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
|
||||||
|
|
||||||
int32_t ver1 = 1;
|
int32_t ver1 = 1;
|
||||||
SMetaInfo info = {0};
|
SMetaInfo info = {0};
|
||||||
|
|
|
@ -29,8 +29,18 @@ static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBloc
|
||||||
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData);
|
SSubmitTbData* pTableData);
|
||||||
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||||
int64_t suid);
|
int64_t suid);
|
||||||
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
||||||
|
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
|
||||||
|
static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
|
||||||
|
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||||
|
const char* dstTableName, int64_t* uid);
|
||||||
|
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
|
||||||
|
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols,
|
||||||
|
SSDataBlock* pDataBlock);
|
||||||
|
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
||||||
|
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
|
||||||
|
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr) {
|
const char* pIdStr) {
|
||||||
|
@ -115,14 +125,6 @@ static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSink
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
|
|
||||||
if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
@ -246,7 +248,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
|
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -283,7 +285,7 @@ static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubm
|
||||||
|
|
||||||
// merge the new submit table block with the existed blocks
|
// merge the new submit table block with the existed blocks
|
||||||
// if ts in the new data block overlap with existed one, replace it
|
// if ts in the new data block overlap with existed one, replace it
|
||||||
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
|
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
|
||||||
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
|
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
|
||||||
int32_t newLen = taosArrayGetSize(pNew->aRowP);
|
int32_t newLen = taosArrayGetSize(pNew->aRowP);
|
||||||
|
|
||||||
|
@ -330,135 +332,6 @@ static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|
||||||
const SArray* pBlocks = (const SArray*)data;
|
|
||||||
SVnode* pVnode = (SVnode*)vnode;
|
|
||||||
int64_t suid = pTask->tbSink.stbUid;
|
|
||||||
char* stbFullName = pTask->tbSink.stbFullName;
|
|
||||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
|
||||||
int32_t vgId = TD_VID(pVnode);
|
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
|
|
||||||
if (pTask->tsInfo.sinkStart == 0) {
|
|
||||||
pTask->tsInfo.sinkStart = taosGetTimestampMs();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool onlySubmitData = true;
|
|
||||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
|
||||||
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
|
||||||
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
|
||||||
onlySubmitData = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!onlySubmitData) {
|
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
|
||||||
numOfBlocks);
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
|
||||||
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
|
||||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
|
||||||
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
|
||||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
pTask->sinkRecorder.numOfBlocks += 1;
|
|
||||||
|
|
||||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
|
||||||
if (submitReq.aSubmitTbData == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
|
||||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
|
||||||
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
|
||||||
if (submitReq.aSubmitTbData == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
|
||||||
taosHashCleanup(pTableIndexMap);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool hasSubmit = false;
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
|
||||||
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
hasSubmit = true;
|
|
||||||
pTask->sinkRecorder.numOfBlocks += 1;
|
|
||||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
|
||||||
|
|
||||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
|
||||||
if (index == NULL) { // no data yet, append it
|
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
|
||||||
|
|
||||||
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
|
||||||
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
|
||||||
} else {
|
|
||||||
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
|
||||||
code = doMergeExistedRows(pExisted, &tbData, id);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashCleanup(pTableIndexMap);
|
|
||||||
|
|
||||||
if (hasSubmit) {
|
|
||||||
doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
|
|
||||||
} else {
|
|
||||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
|
||||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||||
int64_t suid) {
|
int64_t suid) {
|
||||||
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
||||||
|
@ -498,22 +371,25 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isValidDestChildTable(SMetaReader* pReader, int32_t vgId, char* ctbName, int64_t suid) {
|
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
|
||||||
if (pReader->me.type != TSDB_CHILD_TABLE) {
|
if (pReader->me.type != TSDB_CHILD_TABLE) {
|
||||||
tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
|
tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
|
||||||
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->me.ctbEntry.suid != suid) {
|
if (pReader->me.ctbEntry.suid != suid) {
|
||||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
|
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
|
||||||
vgId, ctbName, suid, pReader->me.ctbEntry.suid);
|
vgId, ctbName, suid, pReader->me.ctbEntry.suid);
|
||||||
|
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
|
SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) {
|
||||||
char* ctbName = pDataBlock->info.parTbName;
|
char* ctbName = pDataBlock->info.parTbName;
|
||||||
|
|
||||||
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
|
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
|
||||||
|
@ -570,11 +446,12 @@ static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, i
|
||||||
return pCreateTbReq;
|
return pCreateTbReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, uint64_t uid,
|
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
|
||||||
const char* id) {
|
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
|
||||||
pTableSinkInfo->uid = uid;
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo);
|
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFreeClear(pTableSinkInfo);
|
taosMemoryFreeClear(pTableSinkInfo);
|
||||||
} else {
|
} else {
|
||||||
|
@ -625,7 +502,7 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
||||||
SRow* pRow1 = *(SRow**) p1;
|
SRow* pRow1 = *(SRow**) p1;
|
||||||
SRow* pRow2 = *(SRow**) p2;
|
SRow* pRow2 = *(SRow**) p2;
|
||||||
|
|
||||||
|
@ -636,7 +513,7 @@ static int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
|
int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -707,6 +584,46 @@ static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||||
|
const char* dstTableName, int64_t* uid) {
|
||||||
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
int64_t suid = pTask->tbSink.stbUid;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
while (pTableSinkInfo->uid == 0) {
|
||||||
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
|
||||||
|
return TSDB_CODE_STREAM_EXEC_CANCELLED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the table to be created
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
||||||
|
|
||||||
|
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
|
||||||
|
if (code == 0) { // table already exists, check its type and uid
|
||||||
|
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||||
|
if (isValid) { // not valid table, ignore it
|
||||||
|
tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
|
||||||
|
ASSERT(terrno == 0);
|
||||||
|
|
||||||
|
// set the destination table uid
|
||||||
|
(*uid) = mr.me.uid;
|
||||||
|
pTableSinkInfo->uid = mr.me.uid;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
} else { // not exist, wait and retry
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
taosMsleep(100);
|
||||||
|
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
||||||
SSubmitTbData* pTableData) {
|
SSubmitTbData* pTableData) {
|
||||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||||
|
@ -719,6 +636,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
STableSinkInfo* pTableSinkInfo = NULL;
|
STableSinkInfo* pTableSinkInfo = NULL;
|
||||||
|
|
||||||
bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
||||||
|
|
||||||
if (alreadyCached) {
|
if (alreadyCached) {
|
||||||
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
||||||
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
|
tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
|
||||||
|
@ -741,6 +659,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
|
|
||||||
int32_t nameLen = strlen(dstTableName);
|
int32_t nameLen = strlen(dstTableName);
|
||||||
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
|
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
|
||||||
|
if (pTableSinkInfo == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pTableSinkInfo->name.len = nameLen;
|
pTableSinkInfo->name.len = nameLen;
|
||||||
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
|
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
|
||||||
|
@ -752,45 +673,16 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
|
|
||||||
if (pTableData->uid == 0) {
|
if (pTableData->uid == 0) {
|
||||||
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
||||||
|
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pTableSinkInfo->uid == 0) {
|
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
|
||||||
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the table to be created
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
|
||||||
|
|
||||||
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
|
|
||||||
if (code == 0) { // table already exists, check its type and uid
|
|
||||||
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
|
|
||||||
if (!isValid) { // not valid table, ignore it
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
|
|
||||||
|
|
||||||
pTableData->uid = mr.me.uid;
|
|
||||||
pTableSinkInfo->uid = mr.me.uid;
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
}
|
|
||||||
} else { // not exist, wait and retry
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
taosMsleep(100);
|
|
||||||
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// todo: this check is not safe, and results in losing of submit message from WAL.
|
|
||||||
// The auto-create option will always set to be open for those submit messages, which arrive during the period
|
// The auto-create option will always set to be open for those submit messages, which arrive during the period
|
||||||
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
|
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
|
||||||
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
|
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
|
||||||
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
|
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
|
||||||
// randomly generated false table uid in the WAL.
|
// randomly generated, but false table uid in the WAL.
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
||||||
|
|
||||||
|
@ -808,19 +700,22 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id);
|
pTableSinkInfo->uid = 0;
|
||||||
|
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||||
} else {
|
} else {
|
||||||
bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid);
|
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||||
if (!isValid) {
|
if (!isValid) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(pTableSinkInfo);
|
taosMemoryFree(pTableSinkInfo);
|
||||||
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
|
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
|
||||||
dstTableName);
|
dstTableName);
|
||||||
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
pTableData->uid = mr.me.uid;
|
pTableData->uid = mr.me.uid;
|
||||||
|
pTableSinkInfo->uid = mr.me.uid;
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id);
|
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -848,3 +743,140 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo
|
||||||
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
|
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
|
const SArray* pBlocks = (const SArray*)data;
|
||||||
|
SVnode* pVnode = (SVnode*)vnode;
|
||||||
|
int64_t suid = pTask->tbSink.stbUid;
|
||||||
|
char* stbFullName = pTask->tbSink.stbFullName;
|
||||||
|
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||||
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
if (pTask->tsInfo.sinkStart == 0) {
|
||||||
|
pTask->tsInfo.sinkStart = taosGetTimestampMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool onlySubmitData = true;
|
||||||
|
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
||||||
|
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
|
onlySubmitData = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!onlySubmitData) {
|
||||||
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
||||||
|
numOfBlocks);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||||
|
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
|
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||||
|
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
pTask->sinkRecorder.numOfBlocks += 1;
|
||||||
|
|
||||||
|
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||||
|
if (submitReq.aSubmitTbData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||||
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||||
|
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
||||||
|
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||||
|
if (submitReq.aSubmitTbData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||||
|
taosHashCleanup(pTableIndexMap);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasSubmit = false;
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasSubmit = true;
|
||||||
|
pTask->sinkRecorder.numOfBlocks += 1;
|
||||||
|
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||||
|
|
||||||
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||||
|
|
||||||
|
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||||
|
if (index == NULL) { // no data yet, append it
|
||||||
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||||
|
|
||||||
|
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
||||||
|
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
||||||
|
} else {
|
||||||
|
code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
||||||
|
code = doMergeExistedRows(pExisted, &tbData, id);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(pTableIndexMap);
|
||||||
|
|
||||||
|
if (hasSubmit) {
|
||||||
|
doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
|
||||||
|
} else {
|
||||||
|
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||||
|
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -651,8 +651,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE,"Out of memory in stream queue")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
||||||
|
|
Loading…
Reference in New Issue