Merge pull request #22669 from taosdata/fix/TD-26069
fix invalid insert
This commit is contained in:
commit
e816d0efa5
|
@ -416,6 +416,7 @@ void tFreeStreamTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
||||||
|
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);
|
||||||
|
|
||||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
||||||
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
|
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
|
||||||
|
|
|
@ -128,7 +128,7 @@ int32_t tsQueryPolicy = 1;
|
||||||
int32_t tsQueryRspPolicy = 0;
|
int32_t tsQueryRspPolicy = 0;
|
||||||
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
|
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
|
||||||
bool tsEnableQueryHb = true;
|
bool tsEnableQueryHb = true;
|
||||||
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
|
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
|
||||||
int32_t tsQuerySmaOptimize = 0;
|
int32_t tsQuerySmaOptimize = 0;
|
||||||
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
||||||
bool tsQueryPlannerTrace = false;
|
bool tsQueryPlannerTrace = false;
|
||||||
|
@ -240,11 +240,11 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
// internal
|
// internal
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointTickInterval = 20;
|
int32_t tsStreamCheckpointTickInterval = 600;
|
||||||
int32_t tsStreamNodeCheckInterval = 10;
|
int32_t tsStreamNodeCheckInterval = 10;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
int32_t tsTtlPushIntervalSec = 10;
|
int32_t tsTtlPushIntervalSec = 10;
|
||||||
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
||||||
int32_t tsGrantHBInterval = 60;
|
int32_t tsGrantHBInterval = 60;
|
||||||
int32_t tsUptimeInterval = 300; // seconds
|
int32_t tsUptimeInterval = 300; // seconds
|
||||||
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
|
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
|
||||||
|
|
|
@ -235,27 +235,23 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
STqHandle handle;
|
STqHandle handle;
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
if (pHdr->type == SNAP_DATA_STREAM_TASK) {
|
if (pHdr->type == SNAP_DATA_STREAM_TASK) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTaskId task = {0};
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
code = tDecodeStreamTask(&decoder, pTask);
|
|
||||||
|
code = tDecodeStreamTaskId(&decoder, &task);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
taosMemoryFree(pTask);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
|
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
|
||||||
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t),
|
int64_t key[2] = {task.streamId, task.taskId};
|
||||||
(uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
|
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
||||||
taosMemoryFree(pTask);
|
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosMemoryFree(pTask);
|
|
||||||
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,6 +251,18 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) {
|
||||||
|
int64_t ver;
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
|
||||||
|
if (ver != SSTREAM_TASK_VER) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void freeItem(void* p) {
|
static void freeItem(void* p) {
|
||||||
SStreamContinueExecInfo* pInfo = p;
|
SStreamContinueExecInfo* pInfo = p;
|
||||||
|
@ -538,4 +550,4 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
|
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue