fix(stream): fix syntax error
This commit is contained in:
parent
b2a905bd91
commit
d7f7a7d6cf
|
@ -1330,59 +1330,6 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
|
||||||
|
|
||||||
(*pRefBlock) = NULL;
|
|
||||||
|
|
||||||
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
|
|
||||||
if (pRes->uidList == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecoderInit(pCoder, (uint8_t*)pData, len);
|
|
||||||
tDecodeDeleteRes(pCoder, pRes);
|
|
||||||
tDecoderClear(pCoder);
|
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pRes->uidList);
|
|
||||||
if (numOfTables == 0 || pRes->affectedRows == 0) {
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
|
||||||
blockDataEnsureCapacity(pDelBlock, numOfTables);
|
|
||||||
pDelBlock->info.rows = numOfTables;
|
|
||||||
pDelBlock->info.version = ver;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
|
||||||
// start key column
|
|
||||||
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
|
|
||||||
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
|
|
||||||
// uid column
|
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
||||||
int64_t* pUid = taosArrayGet(pRes->uidList, i);
|
|
||||||
colDataSetVal(pUidCol, i, (const char*)pUid, false);
|
|
||||||
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
|
||||||
if ((*pRefBlock) == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
|
||||||
(*pRefBlock)->pBlock = pDelBlock;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||||
|
|
||||||
|
|
|
@ -454,7 +454,3 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
||||||
(*pRefBlock)->pBlock = pDelBlock;
|
(*pRefBlock)->pBlock = pDelBlock;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqCreateCheckpointBlock(SStreamCheckpoint** pCheckpointBlock) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -598,7 +598,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamAlignTransferState(SStreamTask* pTask) {
|
int32_t streamAlignTransferState(SStreamTask* pTask) {
|
||||||
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
|
int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
|
qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
|
||||||
|
|
Loading…
Reference in New Issue