提交 7ad2f527 作者: Jan Winkelmann

Send empty struct to pubsub cmd output to flush

So the HTTP headers get sent

License: MIT
Signed-off-by: 's avatarJan Winkelmann <j-winkelmann@tuhh.de>
上级 ccb46fa9
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
"strings"
"sync" "sync"
"time" "time"
...@@ -89,10 +90,13 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. ...@@ -89,10 +90,13 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
go func() { go func() {
defer sub.Cancel() defer sub.Cancel()
defer close(out) defer close(out)
out <- floodsub.Message{}
for { for {
msg, err := sub.Next(req.Context()) msg, err := sub.Next(req.Context())
if err == io.EOF || err == context.Canceled { if err == io.EOF || err == context.Canceled {
break return
} else if err != nil { } else if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
...@@ -118,16 +122,30 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. ...@@ -118,16 +122,30 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}, },
Marshalers: cmds.MarshalerMap{ Marshalers: cmds.MarshalerMap{
cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
if m.Message == nil {
return strings.NewReader(""), nil
}
return bytes.NewReader(m.Data), nil return bytes.NewReader(m.Data), nil
}), }),
"ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { "ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
if m.Message == nil {
return strings.NewReader("\n"), nil
}
m.Data = append(m.Data, '\n') m.Data = append(m.Data, '\n')
return bytes.NewReader(m.Data), nil return bytes.NewReader(m.Data), nil
}), }),
"lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { "lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
buf := make([]byte, 8) buf := make([]byte, 8)
n := binary.PutUvarint(buf, uint64(len(m.Data)))
return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil var data []byte
if m.Message != nil {
data = m.Data
}
n := binary.PutUvarint(buf, uint64(len(data)))
return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(data)), nil
}), }),
}, },
Type: floodsub.Message{}, Type: floodsub.Message{},
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论