go-multistream icon indicating copy to clipboard operation
go-multistream copied to clipboard

replace AddHandler with Listen

Open marten-seemann opened this issue 4 years ago • 2 comments

@Stebalien suggested to replace the AddHandler (which gets passed a callback) with a Listen(... protocol.ID) function, returning a Listener, on which an Accept method could be called to accept streams.

This would be a more idiomatic approach, as it closely mirrors how net.Conns and streams are accepted. It would also move the go routine handling to the application, which is arguably the place where it belongs.

marten-seemann avatar Dec 17 '21 06:12 marten-seemann

  • It would also provide a backpressure signal.
  • It would provide a nice and safe way to stop listening on protocols atomically.
  • It would allow workers to pull new streams off the listener.

Stebalien avatar Dec 17 '21 07:12 Stebalien

This will be more involved than expected: multistream is not only used for stream negotiation, but also to negotiate the security protocol (as long as we don't have Protocol Select) and the stream muxer. That's the reason go-multistream currently acts on io.ReadWriteClosers. It would be super nice to get rid of the type assertions and use generics:

type conn interface { // TODO: find a more descriptive name than conn
    io.ReadWriteCloser
    Reset() error
    Protocol() protocol.ID
}

type Listener[T conn] interface {
     io.Closer
     Accept() (T, error)
}

type MultistreamMuxer[T conn] struct {
     Listen(... protocol.ID) (Listener[T], error)
}

These types will be instantiated (or whatever the right generics-lingo term for this is) with a (wrapped) net.Conn, a network.SecureConn and a network.Stream.

We could then have an Accept function that works like this:

type listener {
    queue chan conn // buffered chan (of length 4 or 8)?
}

func (l *listener) addStream(c conn) { // called by the multistream muxer as soon as the protocol was negotiated
	select {
	case l.queue <- c:
	default:
		log.Warnf("dropping", "protocol", c.Protocol())
		c.Reset()
	}
}

func (l *listener) Accept() (network.Stream, error) {
	select {
	case <-l.closeChan:
		return nil, errors.New("listener closed")
	case str := <-l.queue:
		return str, nil
	}
}

marten-seemann avatar Dec 19 '21 07:12 marten-seemann