[Groonga-commit] groonga/grnci at 9551bfd [master] Rename GQTPConn to gqtpConn and add BufferSize to GQTPClientOptions.

Back to archive index

Susumu Yata null+****@clear*****
Thu Jul 27 11:31:51 JST 2017


Susumu Yata	2017-07-27 11:31:51 +0900 (Thu, 27 Jul 2017)

  New Revision: 9551bfdc3d82048f71b1a94b3521bf8d261ec76a
  https://github.com/groonga/grnci/commit/9551bfdc3d82048f71b1a94b3521bf8d261ec76a

  Message:
    Rename GQTPConn to gqtpConn and add BufferSize to GQTPClientOptions.

  Modified files:
    v2/gqtp.go

  Modified: v2/gqtp.go (+60 -88)
===================================================================
--- v2/gqtp.go    2017-07-27 11:12:18 +0900 (f967e72)
+++ v2/gqtp.go    2017-07-27 11:31:51 +0900 (d375e2c)
@@ -48,7 +48,7 @@ type gqtpHeader struct {
 
 // gqtpResponse is a GQTP response.
 type gqtpResponse struct {
-	conn   *GQTPConn  // Connection
+	conn   *gqtpConn  // Connection
 	head   gqtpHeader // Current header
 	err    error      // Error response
 	left   int        // Number of bytes left in the current chunk
@@ -56,7 +56,7 @@ type gqtpResponse struct {
 }
 
 // newGQTPResponse returns a new GQTP response.
-func newGQTPResponse(conn *GQTPConn, head gqtpHeader, name string) *gqtpResponse {
+func newGQTPResponse(conn *gqtpConn, head gqtpHeader, name string) *gqtpResponse {
 	resp := &gqtpResponse{
 		conn: conn,
 		head: head,
@@ -122,7 +122,7 @@ func (r *gqtpResponse) Close() error {
 		return nil
 	}
 	var err error
-	if _, e := io.CopyBuffer(ioutil.Discard, r, r.conn.getBuffer()); e != nil {
+	if _, e := io.CopyBuffer(ioutil.Discard, r, r.conn.buf); e != nil {
 		r.conn.broken = true
 		err = NewError(NetworkError, map[string]interface{}{
 			"method": "io.CopyBuffer",
@@ -156,19 +156,30 @@ func (r *gqtpResponse) Err() error {
 	return r.err
 }
 
-// GQTPConn is a thread-unsafe GQTP client.
-type GQTPConn struct {
-	client  *GQTPClient // Owner client if available
-	conn    net.Conn    // Connection to a GQTP server
-	buf     []byte      // Copy buffer
-	bufSize int         // Copy buffer size
-	ready   bool        // Whether or not the connection is ready to send a command
-	broken  bool        // Whether or not the connection is broken
+// gqtpConnOptions is options of gqtpConn.
+type gqtpConnOptions struct {
+	BufferSize int
 }
 
-// DialGQTP returns a new GQTPConn connected to a GQTP server.
+// newGQTPConnOptions returns the default gqtpConnOptions.
+func newGQTPConnOptions() *gqtpConnOptions {
+	return &gqtpConnOptions{
+		BufferSize: gqtpDefaultBufferSize,
+	}
+}
+
+// gqtpConn is a thread-unsafe GQTP client.
+type gqtpConn struct {
+	client *GQTPClient // Owner client if available
+	conn   net.Conn    // Connection to a GQTP server
+	buf    []byte      // Copy buffer
+	ready  bool        // Whether or not the connection is ready to send a command
+	broken bool        // Whether or not the connection is broken
+}
+
+// dialGQTP returns a new gqtpConn connected to a GQTP server.
 // The expected address format is [scheme://][host][:port].
-func DialGQTP(addr string) (*GQTPConn, error) {
+func dialGQTP(addr string, options *gqtpConnOptions) (*gqtpConn, error) {
 	a, err := ParseGQTPAddress(addr)
 	if err != nil {
 		return nil, err
@@ -181,20 +192,23 @@ func DialGQTP(addr string) (*GQTPConn, error) {
 			"error": err.Error(),
 		})
 	}
-	return NewGQTPConn(conn), nil
+	return newGQTPConn(conn, options), nil
 }
 
-// NewGQTPConn returns a new GQTPConn using an existing connection.
-func NewGQTPConn(conn net.Conn) *GQTPConn {
-	return &GQTPConn{
-		conn:    conn,
-		bufSize: gqtpDefaultBufferSize,
-		ready:   true,
+// newGQTPConn returns a new gqtpConn using an existing connection.
+func newGQTPConn(conn net.Conn, options *gqtpConnOptions) *gqtpConn {
+	if options == nil {
+		options = newGQTPConnOptions()
+	}
+	return &gqtpConn{
+		conn:  conn,
+		buf:   make([]byte, options.BufferSize),
+		ready: true,
 	}
 }
 
 // Close closes the connection.
-func (c *GQTPConn) Close() error {
+func (c *gqtpConn) Close() error {
 	if err := c.conn.Close(); err != nil {
 		return NewError(NetworkError, map[string]interface{}{
 			"method": "net.Conn.Close",
@@ -204,24 +218,8 @@ func (c *GQTPConn) Close() error {
 	return nil
 }
 
-// SetBufferSize updates the size of the copy buffer.
-func (c *GQTPConn) SetBufferSize(n int) {
-	if n <= 0 || n > gqtpMaxChunkSize {
-		n = gqtpDefaultBufferSize
-	}
-	c.bufSize = n
-}
-
-// getBuffer returns the copy buffer.
-func (c *GQTPConn) getBuffer() []byte {
-	if len(c.buf) != c.bufSize {
-		c.buf = make([]byte, c.bufSize)
-	}
-	return c.buf
-}
-
 // sendHeader sends a GQTP header.
-func (c *GQTPConn) sendHeader(flags byte, size int) error {
+func (c *gqtpConn) sendHeader(flags byte, size int) error {
 	head := gqtpHeader{
 		Protocol: gqtpProtocol,
 		Flags:    flags,
@@ -237,7 +235,7 @@ func (c *GQTPConn) sendHeader(flags byte, size int) error {
 }
 
 // sendChunkBytes sends data with flags.
-func (c *GQTPConn) sendChunkBytes(data []byte, flags byte) error {
+func (c *gqtpConn) sendChunkBytes(data []byte, flags byte) error {
 	if err := c.sendHeader(flags, len(data)); err != nil {
 		return err
 	}
@@ -251,7 +249,7 @@ func (c *GQTPConn) sendChunkBytes(data []byte, flags byte) error {
 }
 
 // sendChunkString sends data with flags.
-func (c *GQTPConn) sendChunkString(data string, flags byte) error {
+func (c *gqtpConn) sendChunkString(data string, flags byte) error {
 	if err := c.sendHeader(flags, len(data)); err != nil {
 		return err
 	}
@@ -265,7 +263,7 @@ func (c *GQTPConn) sendChunkString(data string, flags byte) error {
 }
 
 // recvHeader receives a GQTP header.
-func (c *GQTPConn) recvHeader() (gqtpHeader, error) {
+func (c *gqtpConn) recvHeader() (gqtpHeader, error) {
 	var head gqtpHeader
 	if err := binary.Read(c.conn, binary.BigEndian, &head); err != nil {
 		return head, NewError(NetworkError, map[string]interface{}{
@@ -277,7 +275,7 @@ func (c *GQTPConn) recvHeader() (gqtpHeader, error) {
 }
 
 // execNoBody sends a command without body and receives a response.
-func (c *GQTPConn) execNoBody(cmd string) (Response, error) {
+func (c *gqtpConn) execNoBody(cmd string) (Response, error) {
 	name := strings.TrimLeft(cmd, " \t\r\n")
 	if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 {
 		name = name[:idx]
@@ -293,7 +291,7 @@ func (c *GQTPConn) execNoBody(cmd string) (Response, error) {
 }
 
 // execBody sends a command with body and receives a response.
-func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) {
+func (c *gqtpConn) execBody(cmd string, body io.Reader) (Response, error) {
 	name := strings.TrimLeft(cmd, " \t\r\n")
 	if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 {
 		name = name[:idx]
@@ -309,12 +307,11 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) {
 		return newGQTPResponse(c, head, name), nil
 	}
 	n := 0
-	buf := c.getBuffer()
 	for {
-		m, err := body.Read(buf[n:])
+		m, err := body.Read(c.buf[n:])
 		n += m
 		if err != nil {
-			if err := c.sendChunkBytes(buf[:n], gqtpFlagTail); err != nil {
+			if err := c.sendChunkBytes(c.buf[:n], gqtpFlagTail); err != nil {
 				return nil, err
 			}
 			head, err = c.recvHeader()
@@ -323,8 +320,8 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) {
 			}
 			return newGQTPResponse(c, head, name), nil
 		}
-		if n == len(buf) {
-			if err := c.sendChunkBytes(buf, 0); err != nil {
+		if n == len(c.buf) {
+			if err := c.sendChunkBytes(c.buf, 0); err != nil {
 				return nil, err
 			}
 			head, err = c.recvHeader()
@@ -339,8 +336,8 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) {
 	}
 }
 
-// exec sends a command and receives a response.
-func (c *GQTPConn) exec(cmd string, body io.Reader) (Response, error) {
+// Exec sends a command and receives a response.
+func (c *gqtpConn) Exec(cmd string, body io.Reader) (Response, error) {
 	if c.broken {
 		return nil, NewError(OperationError, map[string]interface{}{
 			"error": "The connection is broken.",
@@ -364,38 +361,9 @@ func (c *GQTPConn) exec(cmd string, body io.Reader) (Response, error) {
 	return c.execBody(cmd, body)
 }
 
-// Exec assembles cmd and body into a Command and calls Query.
-// The GQTPConn must not be used until the response is closed.
-func (c *GQTPConn) Exec(cmd string, body io.Reader) (Response, error) {
-	command, err := ParseCommand(cmd)
-	if err != nil {
-		return nil, err
-	}
-	command.SetBody(body)
-	return c.Query(command)
-}
-
-// Invoke assembles name, params and body into a command and calls Query.
-func (c *GQTPConn) Invoke(name string, params map[string]interface{}, body io.Reader) (Response, error) {
-	cmd, err := NewCommand(name, params)
-	if err != nil {
-		return nil, err
-	}
-	cmd.SetBody(body)
-	return c.Query(cmd)
-}
-
-// Query sends a command and receives a response.
-// It is the caller's responsibility to close the response.
-func (c *GQTPConn) Query(cmd *Command) (Response, error) {
-	if err := cmd.Check(); err != nil {
-		return nil, err
-	}
-	return c.exec(cmd.String(), cmd.Body())
-}
-
 // GQTPClientOptions is options of GQTPClient.
 type GQTPClientOptions struct {
+	BufferSize   int // Buffer size
 	MaxIdleConns int // Maximum number of idle connections
 }
 
@@ -408,8 +376,9 @@ func NewGQTPClientOptions() *GQTPClientOptions {
 
 // GQTPClient is a thread-safe GQTP client.
 type GQTPClient struct {
-	addr      string
-	idleConns chan *GQTPConn
+	addr        string           // Server address
+	connOptions *gqtpConnOptions // Options for connections
+	idleConns   chan *gqtpConn   // Idle connections
 }
 
 // NewGQTPClient returns a new GQTPClient connected to a GQTP server.
@@ -418,13 +387,16 @@ func NewGQTPClient(addr string, options *GQTPClientOptions) (*GQTPClient, error)
 	if options == nil {
 		options = NewGQTPClientOptions()
 	}
-	conn, err := DialGQTP(addr)
+	connOptions := newGQTPConnOptions()
+	connOptions.BufferSize = options.BufferSize
+	conn, err := dialGQTP(addr, connOptions)
 	if err != nil {
 		return nil, err
 	}
 	c := &GQTPClient{
-		addr:      addr,
-		idleConns: make(chan *GQTPConn, options.MaxIdleConns),
+		addr:        addr,
+		connOptions: connOptions,
+		idleConns:   make(chan *gqtpConn, options.MaxIdleConns),
 	}
 	c.idleConns <- conn
 	conn.client = c
@@ -450,17 +422,17 @@ func (c *GQTPClient) Close() error {
 
 // exec sends a request and receives a response.
 func (c *GQTPClient) exec(cmd string, body io.Reader) (Response, error) {
-	var conn *GQTPConn
+	var conn *gqtpConn
 	var err error
 	select {
 	case conn = <-c.idleConns:
 	default:
-		conn, err = DialGQTP(c.addr)
+		conn, err = dialGQTP(c.addr, c.connOptions)
 		if err != nil {
 			return nil, err
 		}
 	}
-	resp, err := conn.exec(cmd, body)
+	resp, err := conn.Exec(cmd, body)
 	if err != nil {
 		conn.Close()
 		return nil, err
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index