feat(stream): sink table can be other db
This commit is contained in:
parent
9e4b3adc7c
commit
b23fa19bc4
|
@ -1751,7 +1751,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
taosArrayClear(tagArray);
|
taosArrayClear(tagArray);
|
||||||
taosArrayPush(tagArray, &tagVal);
|
taosArrayPush(tagArray, &tagVal);
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
if (!pTag) {
|
if (pTag == NULL) {
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1762,9 +1762,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||||
|
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1803,8 +1801,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
int32_t schemaLen = 0;
|
int32_t schemaLen = 0;
|
||||||
if (createTb) {
|
if (createTb) {
|
||||||
SVCreateTbReq createTbReq = {0};
|
SVCreateTbReq createTbReq = {0};
|
||||||
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
|
||||||
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
|
|
||||||
createTbReq.name = cname;
|
createTbReq.name = cname;
|
||||||
createTbReq.flags = 0;
|
createTbReq.flags = 0;
|
||||||
createTbReq.type = TSDB_CHILD_TABLE;
|
createTbReq.type = TSDB_CHILD_TABLE;
|
||||||
|
@ -1818,7 +1815,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
taosArrayPush(tagArray, &tagVal);
|
taosArrayPush(tagArray, &tagVal);
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
if (!pTag) {
|
if (pTag == NULL) {
|
||||||
tdDestroySVCreateTbReq(&createTbReq);
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
taosMemoryFreeClear(ret);
|
taosMemoryFreeClear(ret);
|
||||||
|
@ -1944,7 +1941,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
|
||||||
|
|
||||||
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
|
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||||
pBlock->info.rows = numOfRows;
|
|
||||||
|
|
||||||
const char* pStart = pData;
|
const char* pStart = pData;
|
||||||
|
|
||||||
|
@ -2018,6 +2014,7 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
|
||||||
pStart += colLen[i];
|
pStart += colLen[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows = numOfRows;
|
||||||
ASSERT(pStart - pData == dataLen);
|
ASSERT(pStart - pData == dataLen);
|
||||||
return pStart;
|
return pStart;
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
||||||
SVgObj* pVgroup;
|
SVgObj* pVgroup;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pStream->dbUid) {
|
if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -286,7 +286,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
||||||
|
|
||||||
bool hasExtraSink = false;
|
bool hasExtraSink = false;
|
||||||
if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) {
|
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||||
|
if (totLevel == 2 || externalTargetDB) {
|
||||||
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
|
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
|
||||||
taosArrayPush(pStream->tasks, &taskOneLevel);
|
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||||
// add extra sink
|
// add extra sink
|
||||||
|
@ -405,7 +406,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
|
||||||
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
|
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
|
@ -426,10 +426,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
|
||||||
for (int32_t j = 0; j < sinkLvSize; j++) {
|
for (int32_t j = 0; j < sinkLvSize; j++) {
|
||||||
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
||||||
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
|
|
||||||
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
||||||
pVgInfo->taskId = pLastLevelTask->taskId;
|
pVgInfo->taskId = pLastLevelTask->taskId;
|
||||||
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,6 +253,8 @@ int walRoll(SWal *pWal) {
|
||||||
|
|
||||||
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||||
|
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
|
||||||
|
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
|
||||||
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
|
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
|
||||||
if (size != sizeof(SWalIdxEntry)) {
|
if (size != sizeof(SWalIdxEntry)) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -433,9 +433,6 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
|
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
|
||||||
if (pFile == NULL) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#if FILE_WITH_LOCK
|
#if FILE_WITH_LOCK
|
||||||
taosThreadRwlockRdlock(&(pFile->rwlock));
|
taosThreadRwlockRdlock(&(pFile->rwlock));
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue