Merge pull request #25084 from taosdata/fix/3_liaohj
fix(tsdb): set strict varchar length check.
This commit is contained in:
commit
15663ab450
|
@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
|
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
|
||||||
|
|
||||||
|
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -658,11 +658,12 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
|
||||||
colDataSetNULL(pColInfoData, rowIndex);
|
colDataSetNULL(pColInfoData, rowIndex);
|
||||||
} else {
|
} else {
|
||||||
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
||||||
if (pColVal->value.nData > pColInfoData->info.bytes) {
|
if ((pColVal->value.nData + VARSTR_HEADER_SIZE) > pColInfoData->info.bytes) {
|
||||||
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData,
|
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData,
|
||||||
pColInfoData->info.bytes);
|
pColInfoData->info.bytes);
|
||||||
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
||||||
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
|
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
|
||||||
}
|
}
|
||||||
|
|
|
@ -456,12 +456,49 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
stDebug("s-task:0x%x free task completed", taskId);
|
stDebug("s-task:0x%x free task completed", taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
|
||||||
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
|
|
||||||
|
// only set the version info for stream tasks without fill-history task
|
||||||
|
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
||||||
|
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
|
||||||
|
pChkInfo->processedVer = ver - 1; // already processed version
|
||||||
|
pChkInfo->nextProcessVer = ver; // next processed version
|
||||||
|
|
||||||
|
pRange->range.maxVer = ver;
|
||||||
|
pRange->range.minVer = ver;
|
||||||
|
} else {
|
||||||
|
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
|
||||||
|
// is set at the mnode.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
pChkInfo->checkpointVer = pRange->range.maxVer;
|
||||||
|
pChkInfo->processedVer = pRange->range.maxVer;
|
||||||
|
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
|
||||||
|
} else {
|
||||||
|
pChkInfo->checkpointVer = pRange->range.minVer - 1;
|
||||||
|
pChkInfo->processedVer = pRange->range.minVer - 1;
|
||||||
|
pChkInfo->nextProcessVer = pRange->range.minVer;
|
||||||
|
|
||||||
|
{ // for compatible purpose, remove it later
|
||||||
|
if (pRange->range.minVer == 0) {
|
||||||
|
pChkInfo->checkpointVer = 0;
|
||||||
|
pChkInfo->processedVer = 0;
|
||||||
|
pChkInfo->nextProcessVer = 1;
|
||||||
|
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
||||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||||
pTask->refCnt = 1;
|
pTask->refCnt = 1;
|
||||||
|
|
||||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
pTask->inputq.queue = streamQueueOpen(512 << 10);
|
pTask->inputq.queue = streamQueueOpen(512 << 10);
|
||||||
pTask->outputq.queue = streamQueueOpen(512 << 10);
|
pTask->outputq.queue = streamQueueOpen(512 << 10);
|
||||||
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
|
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
|
||||||
|
@ -479,41 +516,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->execInfo.created = taosGetTimestampMs();
|
pTask->execInfo.created = taosGetTimestampMs();
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
setInitialVersionInfo(pTask, ver);
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
|
||||||
|
|
||||||
// only set the version info for stream tasks without fill-history task
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
|
||||||
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
|
|
||||||
pChkInfo->processedVer = ver - 1; // already processed version
|
|
||||||
pChkInfo->nextProcessVer = ver; // next processed version
|
|
||||||
|
|
||||||
pRange->range.maxVer = ver;
|
|
||||||
pRange->range.minVer = ver;
|
|
||||||
} else {
|
|
||||||
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
|
|
||||||
// is set at the mnode.
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
pChkInfo->checkpointVer = pRange->range.maxVer;
|
|
||||||
pChkInfo->processedVer = pRange->range.maxVer;
|
|
||||||
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
|
|
||||||
} else {
|
|
||||||
pChkInfo->checkpointVer = pRange->range.minVer - 1;
|
|
||||||
pChkInfo->processedVer = pRange->range.minVer - 1;
|
|
||||||
pChkInfo->nextProcessVer = pRange->range.minVer;
|
|
||||||
|
|
||||||
{ // for compatible purpose, remove it later
|
|
||||||
if (pRange->range.minVer == 0) {
|
|
||||||
pChkInfo->checkpointVer = 0;
|
|
||||||
pChkInfo->processedVer = 0;
|
|
||||||
pChkInfo->nextProcessVer = 1;
|
|
||||||
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->pMeta = pMeta;
|
pTask->pMeta = pMeta;
|
||||||
pTask->pMsgCb = pMsgCb;
|
pTask->pMsgCb = pMsgCb;
|
||||||
|
|
Loading…
Reference in New Issue