提交 3b296530 作者: Juan Batiz-Benet

updated msgio + secio

上级 9d304768
......@@ -110,7 +110,7 @@
},
{
"ImportPath": "github.com/jbenet/go-msgio",
"Rev": "753e598a1d24b311ee05c4ce001cff74e2a8e745"
"Rev": "281b085dc602c4f0377438e20331f45a91bcdf9c"
},
{
"ImportPath": "github.com/jbenet/go-multiaddr",
......
......@@ -45,6 +45,10 @@ type Reader interface {
// ReleaseMsg signals a buffer can be reused.
ReleaseMsg([]byte)
// NextMsgLen returns the length of the next (peeked) message. Does
// not destroy the message or have other adverse effects
NextMsgLen() (int, error)
}
// ReadCloser combines a Reader and Closer.
......@@ -142,9 +146,15 @@ func NewReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser {
}
}
// nextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like ReadMsg, nextMsgLen is destructive. It reads from the internal
// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like Read, NextMsgLen is destructive. It reads from the internal
// reader.
func (s *reader) NextMsgLen() (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.nextMsgLen()
}
func (s *reader) nextMsgLen() (int, error) {
if s.next == -1 {
if _, err := io.ReadFull(s.R, s.lbuf); err != nil {
......
// Package mpool provides a sync.Pool equivalent that buckets incoming
// requests to one of 32 sub-pools, one for each power of 2, 0-32.
//
// import "github.com/jbenet/go-msgio/mpool"
// var p mpool.Pool
//
// small := make([]byte, 1024)
// large := make([]byte, 4194304)
// p.Put(1024, small)
// p.Put(4194304, large)
//
// small2 := p.Get(1024).([]byte)
// large2 := p.Get(4194304).([]byte)
// fmt.Println("small2 len:", len(small2))
// fmt.Println("large2 len:", len(large2))
//
// // Output:
// // small2 len: 1024
// // large2 len: 4194304
//
package mpool
import (
"fmt"
"sync"
)
// ByteSlicePool is a static Pool for reusing byteslices of various sizes.
var ByteSlicePool Pool
func init() {
ByteSlicePool.New = func(length int) interface{} {
return make([]byte, length)
}
}
// MaxLength is the maximum length of an element that can be added to the Pool.
const MaxLength = (1 << 32) - 1
// Pool is a pool to handle cases of reusing elements of varying sizes.
// It maintains up to 32 internal pools, for each power of 2 in 0-32.
type Pool struct {
small int // the size of the first pool
pools [32]*sync.Pool // a list of singlePools
sync.Mutex // protecting list
// New is a function that constructs a new element in the pool, with given len
New func(len int) interface{}
}
func (p *Pool) getPool(idx uint32) *sync.Pool {
if idx > uint32(len(p.pools)) {
panic(fmt.Errorf("index too large: %d", idx))
}
p.Lock()
defer p.Unlock()
sp := p.pools[idx]
if sp == nil {
sp = new(sync.Pool)
p.pools[idx] = sp
}
return sp
}
// Get selects an arbitrary item from the Pool, removes it from the Pool,
// and returns it to the caller. Get may choose to ignore the pool and
// treat it as empty. Callers should not assume any relation between values
// passed to Put and the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns the
// result of calling p.New.
func (p *Pool) Get(length uint32) interface{} {
idx := largerPowerOfTwo(length)
sp := p.getPool(idx)
val := sp.Get()
if val == nil && p.New != nil {
val = p.New(0x1 << idx)
}
return val
}
// Put adds x to the pool.
func (p *Pool) Put(length uint32, val interface{}) {
if length > MaxLength {
length = MaxLength
}
idx := smallerPowerOfTwo(length)
sp := p.getPool(idx)
sp.Put(val)
}
func largerPowerOfTwo(num uint32) uint32 {
for p := uint32(0); p < 32; p++ {
if (0x1 << p) >= num {
return p
}
}
panic("unreachable")
}
func smallerPowerOfTwo(num uint32) uint32 {
for p := uint32(1); p < 32; p++ {
if (0x1 << p) > num {
return p - 1
}
}
panic("unreachable")
}
......@@ -87,22 +87,34 @@ func NewETMReader(r io.Reader, s cipher.Stream, mac HMAC) msgio.ReadCloser {
return &etmReader{msg: msgio.NewReader(r), str: s, mac: mac}
}
func (r *etmReader) NextMsgLen() (int, error) {
return r.msg.NextMsgLen()
}
func (r *etmReader) Read(buf []byte) (int, error) {
// first, check the buffer has enough space.
fullLen, err := r.msg.NextMsgLen()
if err != nil {
return 0, err
}
dataLen := fullLen - r.mac.size
if cap(buf) < dataLen {
return 0, io.ErrShortBuffer
}
buf2 := buf
changed := false
if cap(buf2) < (len(buf) + r.mac.size) {
buf2 = make([]byte, len(buf)+r.mac.size)
if cap(buf) < fullLen {
buf2 = make([]byte, fullLen)
changed = true
}
buf2 = buf2[:fullLen]
// WARNING: assumes msg.Read will only read _one_ message. this is what
// msgio is supposed to do. but msgio may change in the future. may this
// comment be your guiding light.
n, err := r.msg.Read(buf2)
n, err := io.ReadFull(r.msg, buf2)
if err != nil {
return n, err
}
buf2 = buf2[:n]
m, err := r.macCheckThenDecrypt(buf2)
if err != nil {
......
......@@ -115,6 +115,10 @@ func (c *singleConn) Write(buf []byte) (int, error) {
return c.msgrw.Write(buf)
}
func (c *singleConn) NextMsgLen() (int, error) {
return c.msgrw.NextMsgLen()
}
// ReadMsg reads data, net.Conn style
func (c *singleConn) ReadMsg() ([]byte, error) {
return c.msgrw.ReadMsg()
......
......@@ -266,6 +266,14 @@ func (c *MultiConn) Write(buf []byte) (int, error) {
return bc.Write(buf)
}
func (c *MultiConn) NextMsgLen() (int, error) {
bc := c.BestConn()
if bc == nil {
return 0, errors.New("no best connection")
}
return bc.NextMsgLen()
}
// ReadMsg reads data, net.Conn style
func (c *MultiConn) ReadMsg() ([]byte, error) {
next, ok := <-c.fanIn
......
......@@ -93,6 +93,10 @@ func (c *secureConn) Write(buf []byte) (int, error) {
return c.secure.Write(buf)
}
func (c *secureConn) NextMsgLen() (int, error) {
return c.secure.NextMsgLen()
}
// ReadMsg reads data, net.Conn style
func (c *secureConn) ReadMsg() ([]byte, error) {
return c.secure.ReadMsg()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论