Golang HTTP Handler – можно ли реализовать шаблон вызова и ответа в одном потоковом POST?

У меня очень большие наборы данных в клиентах для мобильных устройств и настольных компьютеров, которые я пытаюсь двунаправленно синхронизировать со своим веб-сайтом.

В настоящее время я настроил поток, в котором я отправляю данные на сервер частями. Когда обработка каждого фрагмента завершена и сервер успешно зарегистрировал этот фрагмент данных, мне нужно отправить обратно данные ответа, содержащие информацию реестра, которая помогает поддерживать синхронизацию сервера и клиента.

После получения этого ответа клиент загрузит еще один фрагмент, и процесс начнется снова. Данные от клиента передаются на сервер, чтобы снизить потребление памяти клиентом.

Я перепробовал множество комбинаций, чтобы посмотреть, смогу ли я заставить это работать. Во всех случаях, если сервер записывает ответ, а клиент оставляет поток открытым (ожидая ответа от сервера для продолжения), то я не получаю ответ сервера до тех пор, пока не истечет время ожидания соединения.

Итак, есть ли способ сделать это в потоке?

Отправьте данные – получите ответ Отправьте больше данных – получите ответ И так далее...?

Ниже приведен небольшой надуманный пример того, как мне удалось успешно прочитать потоковые данные, но не отправить обратно несколько ответов. ПРИМЕЧАНИЕ. Фактическая реализация не отправляет ответ на каждую новую строку, это просто упрощение для демонстрационных целей.

func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {  
        reader := bufio.NewReader(r.Body) 
        for {
            line, err := reader.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                    logger.Info("Received EOF")
                    break // naturally reached end of stream
                }
                writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
                return
            }
            // Send response here
            writeData([]byte("confirmation data"))
        }
    }
}

🤔 А знаете ли вы, что...
В Go отсутствуют скобки для управления блоками кода; вместо этого используются отступы.


1
62
2

Ответы:

  1. Websocket не поддерживает h2

  2. sse — это односторонний поток, и таймаут Go младшей версии его не поддерживает.

  3. Обратитесь к реализации sse

Пожалуйста, запустите его:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net"
    "net/http"
    "time"
)

const (
    HeaderAccept           = "Accept"
    HeaderCacheControl     = "Cache-Control"
    HeaderConnection       = "Connection"
    HeaderContentType      = "Content-Type"
    MimeTextEventStream    = "text/event-stream"
    HeaderTransferEncoding = "Transfer-Encoding"
)

type Request struct {
    Index int
}

type Response struct {
    Index int
}

func main() {
    go func() {
        // Wait listen port
        time.Sleep(time.Second)
        ch := make(chan *Request, 8)
        ch <- &Request{}

        for {
            resp, err := NewClientStream("GET", "http://localhost:8088/call", ch, func(req *Request, resp *Response) error {
                fmt.Println("client:", req.Index, resp.Index)
                time.Sleep(time.Second*3)
                ch <- &Request{rand.Intn(256)}
                return nil
            })

            if err != nil {
                log.Println(err)
            }
            if resp != nil && resp.StatusCode == http.StatusNoContent {
                return
            }
        }
    }()

    ln ,err := net.Listen("tcp",":8088")
    if err != nil {
        panic(err)
    }
    log.Println("listen :8088")

    srv := http.Server{
        // test timeout
        ReadTimeout:  time.Second,
        WriteTimeout: time.Second,
        IdleTimeout:  time.Second,
        Handler: NewHandlerStream(func(req *Request) (any, error) {
            resp := &Response{req.Index + 1}
            fmt.Println("server:", req.Index, resp.Index)
            return resp, nil
        }),
    }
    srv.Serve(ln)
    srv.ListenAndServe()
}

func NewHandlerStream[T any](rpc func(*T) (any, error)) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        if r.Header.Get(HeaderAccept) != MimeTextEventStream {
            return
        }

        w.Header().Set(HeaderContentType, MimeTextEventStream)
        w.Header().Set(HeaderTransferEncoding, "chunked")
        w.Header().Set(HeaderConnection, "keep-alive")
        w.Header().Set(HeaderCacheControl, "no-cache")
        w.WriteHeader(http.StatusOK)
        ctl := http.NewResponseController(w)
        ctl.EnableFullDuplex()
        ctl.SetReadDeadline(time.Time{})
        ctl.SetWriteDeadline(time.Time{})
        ctl.Flush()

        de := json.NewDecoder(r.Body)
        en := json.NewEncoder(w)
        for {
            // receive
            req := new(T)
            err := de.Decode(req)
            if err != nil {
                return
            }

            resp, err := rpc(req)
            if err != nil {
                return
            }

            err = en.Encode(resp)
            if err != nil {
                return
            }
            ctl.Flush()
        }
    }
}

func NewClientStream[T, P any](method, path string, ch chan *P, call func(*P, *T) error) (*http.Response, error) {
    r, w := io.Pipe()
    req, err := http.NewRequest(method, path, r)
    if err != nil {
        return nil, err
    }
    req.Header.Set(HeaderAccept, MimeTextEventStream)
    req.Header.Set(HeaderTransferEncoding, "chunked")

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }

    if resp.StatusCode == http.StatusNoContent {
        return resp, err
    }
    if resp.StatusCode != http.StatusOK {
        return resp, fmt.Errorf("response status code is %d", resp.StatusCode)
    }

    de := json.NewDecoder(resp.Body)
    en := json.NewEncoder(w)

    for {
        // send request
        data := <-ch
        err := en.Encode(data)
        if err != nil {
            return resp, err
        }

        // receive
        receive := new(T)
        err = de.Decode(receive)
        if err != nil {
            return resp, err
        }

        // handler
        err = call(data, receive)
        if err != nil {
            return resp, err
        }
    }
}

Решено

Огромное спасибо LaGec и Серизе Лимон.

Ниже приведен модифицированный, надуманный пример, показывающий, как мне удалось заставить его работать.

func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
    
        // Added as noted in a comment
        controller := http.NewResponseController(w)
        _ = controller.EnableFullDuplex() // no error reported 
  
        reader := bufio.NewReader(r.Body) 
        for {
            line, err := reader.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                    logger.Info("Received EOF")
                    break // naturally reached end of stream
                }
                writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
                return
            }
            // Send response here
            writeData([]byte("confirmation data"))
            // Added as noted in another comment
            controller.Flush()
        }
    }
}