package h2quic import ( "crypto/tls" "errors" "fmt" "io" "net" "net/http" "strings" "sync" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "golang.org/x/net/idna" quic "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/qerr" ) type roundTripperOpts struct { DisableCompression bool } var dialAddr = quic.DialAddr // client is a HTTP2 client doing QUIC requests type client struct { mutex sync.RWMutex tlsConf *tls.Config config *quic.Config opts *roundTripperOpts hostname string handshakeErr error dialOnce sync.Once dialer func(network, addr string, tlsCfg *tls.Config, cfg *quic.Config) (quic.Session, error) session quic.Session headerStream quic.Stream headerErr *qerr.QuicError headerErrored chan struct{} // this channel is closed if an error occurs on the header stream requestWriter *requestWriter responses map[protocol.StreamID]chan *http.Response logger utils.Logger } var _ http.RoundTripper = &client{} var defaultQuicConfig = &quic.Config{ RequestConnectionIDOmission: true, KeepAlive: true, } // newClient creates a new client func newClient( hostname string, tlsConfig *tls.Config, opts *roundTripperOpts, quicConfig *quic.Config, dialer func(network, addr string, tlsCfg *tls.Config, cfg *quic.Config) (quic.Session, error), ) *client { config := defaultQuicConfig if quicConfig != nil { config = quicConfig } return &client{ hostname: authorityAddr("https", hostname), responses: make(map[protocol.StreamID]chan *http.Response), tlsConf: tlsConfig, config: config, opts: opts, headerErrored: make(chan struct{}), dialer: dialer, logger: utils.DefaultLogger.WithPrefix("client"), } } // dial dials the connection func (c *client) dial() error { var err error if c.dialer != nil { c.session, err = c.dialer("udp", c.hostname, c.tlsConf, c.config) } else { c.session, err = dialAddr(c.hostname, c.tlsConf, c.config) } if err != nil { return err } // once the version has been negotiated, open the header stream c.headerStream, err = c.session.OpenStream() if err != nil { return err } c.requestWriter = newRequestWriter(c.headerStream, c.logger) go c.handleHeaderStream() return nil } func (c *client) handleHeaderStream() { decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {}) h2framer := http2.NewFramer(nil, c.headerStream) var err error for err == nil { err = c.readResponse(h2framer, decoder) } if quicErr, ok := err.(*qerr.QuicError); !ok || quicErr.ErrorCode != qerr.PeerGoingAway { c.logger.Debugf("Error handling header stream: %s", err) } c.headerErr = qerr.Error(qerr.InvalidHeadersStreamData, err.Error()) // stop all running request close(c.headerErrored) } func (c *client) readResponse(h2framer *http2.Framer, decoder *hpack.Decoder) error { frame, err := h2framer.ReadFrame() if err != nil { return err } hframe, ok := frame.(*http2.HeadersFrame) if !ok { return errors.New("not a headers frame") } mhframe := &http2.MetaHeadersFrame{HeadersFrame: hframe} mhframe.Fields, err = decoder.DecodeFull(hframe.HeaderBlockFragment()) if err != nil { return fmt.Errorf("cannot read header fields: %s", err.Error()) } c.mutex.RLock() responseChan, ok := c.responses[protocol.StreamID(hframe.StreamID)] c.mutex.RUnlock() if !ok { return fmt.Errorf("response channel for stream %d not found", hframe.StreamID) } rsp, err := responseFromHeaders(mhframe) if err != nil { return err } responseChan <- rsp return nil } // Roundtrip executes a request and returns a response func (c *client) RoundTrip(req *http.Request) (*http.Response, error) { // TODO: add port to address, if it doesn't have one if req.URL.Scheme != "https" { return nil, errors.New("quic http2: unsupported scheme") } if authorityAddr("https", hostnameFromRequest(req)) != c.hostname { return nil, fmt.Errorf("h2quic Client BUG: RoundTrip called for the wrong client (expected %s, got %s)", c.hostname, req.Host) } c.dialOnce.Do(func() { c.handshakeErr = c.dial() }) if c.handshakeErr != nil { return nil, c.handshakeErr } hasBody := (req.Body != nil) responseChan := make(chan *http.Response) dataStream, err := c.session.OpenStreamSync() if err != nil { _ = c.closeWithError(err) return nil, err } c.mutex.Lock() c.responses[dataStream.StreamID()] = responseChan c.mutex.Unlock() var requestedGzip bool if !c.opts.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && req.Method != "HEAD" { requestedGzip = true } // TODO: add support for trailers endStream := !hasBody err = c.requestWriter.WriteRequest(req, dataStream.StreamID(), endStream, requestedGzip) if err != nil { _ = c.closeWithError(err) return nil, err } resc := make(chan error, 1) if hasBody { go func() { resc <- c.writeRequestBody(dataStream, req.Body) }() } var res *http.Response var receivedResponse bool var bodySent bool if !hasBody { bodySent = true } ctx := req.Context() for !(bodySent && receivedResponse) { select { case res = <-responseChan: receivedResponse = true c.mutex.Lock() delete(c.responses, dataStream.StreamID()) c.mutex.Unlock() case err := <-resc: bodySent = true if err != nil { return nil, err } case <-ctx.Done(): // error code 6 signals that stream was canceled dataStream.CancelRead(6) dataStream.CancelWrite(6) c.mutex.Lock() delete(c.responses, dataStream.StreamID()) c.mutex.Unlock() return nil, ctx.Err() case <-c.headerErrored: // an error occurred on the header stream _ = c.closeWithError(c.headerErr) return nil, c.headerErr } } // TODO: correctly set this variable var streamEnded bool isHead := (req.Method == "HEAD") res = setLength(res, isHead, streamEnded) if streamEnded || isHead { res.Body = noBody } else { res.Body = dataStream if requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") res.Header.Del("Content-Length") res.ContentLength = -1 res.Body = &gzipReader{body: res.Body} res.Uncompressed = true } } res.Request = req return res, nil } func (c *client) writeRequestBody(dataStream quic.Stream, body io.ReadCloser) (err error) { defer func() { cerr := body.Close() if err == nil { // TODO: what to do with dataStream here? Maybe reset it? err = cerr } }() _, err = io.Copy(dataStream, body) if err != nil { // TODO: what to do with dataStream here? Maybe reset it? return err } return dataStream.Close() } func (c *client) closeWithError(e error) error { if c.session == nil { return nil } return c.session.CloseWithError(quic.ErrorCode(qerr.InternalError), e) } // Close closes the client func (c *client) Close() error { if c.session == nil { return nil } return c.session.Close() } // copied from net/transport.go // authorityAddr returns a given authority (a host/IP, or host:port / ip:port) // and returns a host:port. The port 443 is added if needed. func authorityAddr(scheme string, authority string) (addr string) { host, port, err := net.SplitHostPort(authority) if err != nil { // authority didn't have a port port = "443" if scheme == "http" { port = "80" } host = authority } if a, err := idna.ToASCII(host); err == nil { host = a } // IPv6 address literal, without a port: if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { return host + ":" + port } return net.JoinHostPort(host, port) }