У меня очень большие наборы данных в клиентах для мобильных устройств и настольных компьютеров, которые я пытаюсь двунаправленно синхронизировать со своим веб-сайтом.
В настоящее время я настроил поток, в котором я отправляю данные на сервер частями. Когда обработка каждого фрагмента завершена и сервер успешно зарегистрировал этот фрагмент данных, мне нужно отправить обратно данные ответа, содержащие информацию реестра, которая помогает поддерживать синхронизацию сервера и клиента.
После получения этого ответа клиент загрузит еще один фрагмент, и процесс начнется снова. Данные от клиента передаются на сервер, чтобы снизить потребление памяти клиентом.
Я перепробовал множество комбинаций, чтобы посмотреть, смогу ли я заставить это работать. Во всех случаях, если сервер записывает ответ, а клиент оставляет поток открытым (ожидая ответа от сервера для продолжения), то я не получаю ответ сервера до тех пор, пока не истечет время ожидания соединения.
Итак, есть ли способ сделать это в потоке?
Отправьте данные – получите ответ Отправьте больше данных – получите ответ И так далее...?
Ниже приведен небольшой надуманный пример того, как мне удалось успешно прочитать потоковые данные, но не отправить обратно несколько ответов. ПРИМЕЧАНИЕ. Фактическая реализация не отправляет ответ на каждую новую строку, это просто упрощение для демонстрационных целей.
func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
reader := bufio.NewReader(r.Body)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
logger.Info("Received EOF")
break // naturally reached end of stream
}
writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
return
}
// Send response here
writeData([]byte("confirmation data"))
}
}
}
🤔 А знаете ли вы, что...
В Go отсутствуют скобки для управления блоками кода; вместо этого используются отступы.
Websocket не поддерживает h2
sse — это односторонний поток, и таймаут Go младшей версии его не поддерживает.
Обратитесь к реализации sse
Пожалуйста, запустите его:
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"net"
"net/http"
"time"
)
const (
HeaderAccept = "Accept"
HeaderCacheControl = "Cache-Control"
HeaderConnection = "Connection"
HeaderContentType = "Content-Type"
MimeTextEventStream = "text/event-stream"
HeaderTransferEncoding = "Transfer-Encoding"
)
type Request struct {
Index int
}
type Response struct {
Index int
}
func main() {
go func() {
// Wait listen port
time.Sleep(time.Second)
ch := make(chan *Request, 8)
ch <- &Request{}
for {
resp, err := NewClientStream("GET", "http://localhost:8088/call", ch, func(req *Request, resp *Response) error {
fmt.Println("client:", req.Index, resp.Index)
time.Sleep(time.Second*3)
ch <- &Request{rand.Intn(256)}
return nil
})
if err != nil {
log.Println(err)
}
if resp != nil && resp.StatusCode == http.StatusNoContent {
return
}
}
}()
ln ,err := net.Listen("tcp",":8088")
if err != nil {
panic(err)
}
log.Println("listen :8088")
srv := http.Server{
// test timeout
ReadTimeout: time.Second,
WriteTimeout: time.Second,
IdleTimeout: time.Second,
Handler: NewHandlerStream(func(req *Request) (any, error) {
resp := &Response{req.Index + 1}
fmt.Println("server:", req.Index, resp.Index)
return resp, nil
}),
}
srv.Serve(ln)
srv.ListenAndServe()
}
func NewHandlerStream[T any](rpc func(*T) (any, error)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(HeaderAccept) != MimeTextEventStream {
return
}
w.Header().Set(HeaderContentType, MimeTextEventStream)
w.Header().Set(HeaderTransferEncoding, "chunked")
w.Header().Set(HeaderConnection, "keep-alive")
w.Header().Set(HeaderCacheControl, "no-cache")
w.WriteHeader(http.StatusOK)
ctl := http.NewResponseController(w)
ctl.EnableFullDuplex()
ctl.SetReadDeadline(time.Time{})
ctl.SetWriteDeadline(time.Time{})
ctl.Flush()
de := json.NewDecoder(r.Body)
en := json.NewEncoder(w)
for {
// receive
req := new(T)
err := de.Decode(req)
if err != nil {
return
}
resp, err := rpc(req)
if err != nil {
return
}
err = en.Encode(resp)
if err != nil {
return
}
ctl.Flush()
}
}
}
func NewClientStream[T, P any](method, path string, ch chan *P, call func(*P, *T) error) (*http.Response, error) {
r, w := io.Pipe()
req, err := http.NewRequest(method, path, r)
if err != nil {
return nil, err
}
req.Header.Set(HeaderAccept, MimeTextEventStream)
req.Header.Set(HeaderTransferEncoding, "chunked")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusNoContent {
return resp, err
}
if resp.StatusCode != http.StatusOK {
return resp, fmt.Errorf("response status code is %d", resp.StatusCode)
}
de := json.NewDecoder(resp.Body)
en := json.NewEncoder(w)
for {
// send request
data := <-ch
err := en.Encode(data)
if err != nil {
return resp, err
}
// receive
receive := new(T)
err = de.Decode(receive)
if err != nil {
return resp, err
}
// handler
err = call(data, receive)
if err != nil {
return resp, err
}
}
}
Огромное спасибо LaGec и Серизе Лимон.
Ниже приведен модифицированный, надуманный пример, показывающий, как мне удалось заставить его работать.
func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Added as noted in a comment
controller := http.NewResponseController(w)
_ = controller.EnableFullDuplex() // no error reported
reader := bufio.NewReader(r.Body)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
logger.Info("Received EOF")
break // naturally reached end of stream
}
writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
return
}
// Send response here
writeData([]byte("confirmation data"))
// Added as noted in another comment
controller.Flush()
}
}
}