refactor: do some internal refactor.
This commit is contained in:
parent
8e15c64499
commit
3d00f6ce54
|
@ -19,10 +19,10 @@
|
||||||
|
|
||||||
#define MAX_CATCH_NUM 10240
|
#define MAX_CATCH_NUM 10240
|
||||||
|
|
||||||
typedef struct STblInfo {
|
typedef struct STableSinkInfo {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN];
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
} STblInfo;
|
} STableSinkInfo;
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr) {
|
const char* pIdStr) {
|
||||||
|
@ -97,16 +97,17 @@ end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) {
|
static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
|
||||||
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t));
|
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
||||||
if (pVal) {
|
if (pVal) {
|
||||||
*pTbl = *(STblInfo**)pVal;
|
*pInfo = *(STableSinkInfo**)pVal;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) {
|
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
|
||||||
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
|
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -274,7 +275,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
crTblArray = NULL;
|
crTblArray = NULL;
|
||||||
} else {
|
} else {
|
||||||
SSubmitTbData tbData = {0};
|
SSubmitTbData tbData = {0};
|
||||||
tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows);
|
tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
|
||||||
|
|
||||||
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -283,35 +284,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
tbData.suid = suid;
|
tbData.suid = suid;
|
||||||
tbData.uid = 0; // uid is assigned by vnode
|
tbData.uid = 0; // uid is assigned by vnode
|
||||||
tbData.sver = pTSchema->version;
|
tbData.sver = pTSchema->version;
|
||||||
STblInfo* pTblMeta = NULL;
|
|
||||||
|
|
||||||
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta);
|
STableSinkInfo* pTableSinkInfo = NULL;
|
||||||
|
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
|
||||||
if (res != TSDB_CODE_SUCCESS) {
|
if (res != TSDB_CODE_SUCCESS) {
|
||||||
pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo));
|
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
char* ctbName = pDataBlock->info.parTbName;
|
char* ctbName = pDataBlock->info.parTbName;
|
||||||
if (!ctbName[0]) {
|
if (!ctbName[0]) {
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName));
|
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
|
||||||
} else {
|
} else {
|
||||||
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
||||||
memcpy(ctbName, tmp, strlen(tmp));
|
memcpy(ctbName, tmp, strlen(tmp));
|
||||||
memcpy(pTblMeta->tbName, tmp, strlen(tmp));
|
memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode),
|
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
|
||||||
pDataBlock->info.id.groupId);
|
pDataBlock->info.id.groupId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
tbData.uid = pTblMeta->uid;
|
tbData.uid = pTableSinkInfo->uid;
|
||||||
} else {
|
} else {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||||
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(pTblMeta);
|
taosMemoryFree(pTableSinkInfo);
|
||||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
||||||
|
|
||||||
SVCreateTbReq* pCreateTbReq = NULL;
|
SVCreateTbReq* pCreateTbReq = NULL;
|
||||||
|
@ -371,7 +372,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
||||||
mr.me.type);
|
mr.me.type);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(pTblMeta);
|
taosMemoryFree(pTableSinkInfo);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,13 +381,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
", actual suid %" PRId64 "",
|
", actual suid %" PRId64 "",
|
||||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(pTblMeta);
|
taosMemoryFree(pTableSinkInfo);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tbData.uid = mr.me.uid;
|
tbData.uid = mr.me.uid;
|
||||||
pTblMeta->uid = mr.me.uid;
|
pTableSinkInfo->uid = mr.me.uid;
|
||||||
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta);
|
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -284,7 +284,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tInputQueueIsFull(const SStreamTask* pTask) {
|
bool tInputQueueIsFull(const SStreamTask* pTask) {
|
||||||
return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
|
bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
|
||||||
|
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
||||||
|
return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
@ -298,8 +300,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
px->submit.msgLen, px->submit.ver, numOfBlocks, size);
|
px->submit.msgLen, px->submit.ver, numOfBlocks, size);
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
|
||||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
|
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
|
||||||
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
||||||
numOfBlocks, size);
|
numOfBlocks, size);
|
||||||
|
@ -320,8 +321,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
|
||||||
size);
|
size);
|
||||||
|
|
Loading…
Reference in New Issue