refactor: support submitreq2
This commit is contained in:
parent
df251e9750
commit
271822ebc4
|
@ -389,7 +389,7 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen,
|
|||
pReader->msg2.ver = ver;
|
||||
pReader->ver = ver;
|
||||
|
||||
tqDebug("tq reader set msg %p", msgStr);
|
||||
tqDebug("tq reader set msg %p %d", msgStr, msgLen);
|
||||
|
||||
if (pReader->setMsg == 0) {
|
||||
SDecoder decoder;
|
||||
|
@ -683,6 +683,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
|
|||
} else if (pCol->cid == pColData->info.colId) {
|
||||
for (int32_t i = 0; i < pCol->nVal; i++) {
|
||||
tColDataGetValue(pCol, sourceIdx, &colVal);
|
||||
#if 0
|
||||
void* val = NULL;
|
||||
if (IS_STR_DATA_TYPE(colVal.type)) {
|
||||
val = colVal.value.pData;
|
||||
|
@ -692,6 +693,20 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
|
|||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
#endif
|
||||
char val[65535 + 2];
|
||||
if (IS_STR_DATA_TYPE(colVal.type)) {
|
||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||
varDataSetLen(val, colVal.value.nData);
|
||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
/*val = colVal.value.pData;*/
|
||||
} else {
|
||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
}
|
||||
}
|
||||
sourceIdx++;
|
||||
targetIdx++;
|
||||
|
@ -718,15 +733,19 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
|
|||
sourceIdx++;
|
||||
continue;
|
||||
} else if (colVal.cid == pColData->info.colId) {
|
||||
void* val = NULL;
|
||||
char val[65535 + 2];
|
||||
if (IS_STR_DATA_TYPE(colVal.type)) {
|
||||
val = colVal.value.pData;
|
||||
} else {
|
||||
val = &colVal.value.val;
|
||||
}
|
||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||
varDataSetLen(val, colVal.value.nData);
|
||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
/*val = colVal.value.pData;*/
|
||||
} else {
|
||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
sourceIdx++;
|
||||
targetIdx++;
|
||||
|
|
|
@ -881,8 +881,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
|
||||
vDebug("vvvvvvvvvvvv %p, %d", pReq, len);
|
||||
|
||||
// decode
|
||||
SDecoder dc = {0};
|
||||
tDecoderInit(&dc, pReq, len);
|
||||
|
|
|
@ -271,57 +271,58 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||
SWalCont *pReadHead = &pRead->pHead->head;
|
||||
static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||
SWalCont *pReadHead = &pReader->pHead->head;
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver);
|
||||
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver,
|
||||
pReadHead->bodyLen);
|
||||
|
||||
if (pRead->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (pReader->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pRead->pHead = ptr;
|
||||
pReadHead = &pRead->pHead->head;
|
||||
pRead->capacity = pReadHead->bodyLen;
|
||||
pReader->pHead = ptr;
|
||||
pReadHead = &pReader->pHead->head;
|
||||
pReader->capacity = pReadHead->bodyLen;
|
||||
}
|
||||
|
||||
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
|
||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver, tstrerror(terrno));
|
||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
|
||||
} else {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
|
||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
|
||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
pRead->curInvalid = 1;
|
||||
pReader->curInvalid = 1;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReadHead->version != ver) {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
pRead->pHead->head.version, ver);
|
||||
pRead->curInvalid = 1;
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||
pReader->pHead->head.version, ver);
|
||||
pReader->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
pRead->curInvalid = 1;
|
||||
if (walValidBodyCksum(pReader->pHead) != 0) {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
|
||||
pReader->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver);
|
||||
pRead->curVersion = ver + 1;
|
||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver);
|
||||
pReader->curVersion = ver + 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue