Skip to content

Commit 3ad6127

Browse files
authored
feat: optimize code (#149)
Co-authored-by: wencaiwulue <895703375@qq.com>
1 parent 14e91d5 commit 3ad6127

File tree

9 files changed

+62
-47
lines changed

9 files changed

+62
-47
lines changed

cmd/kubevpn/cmds/controlplane.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ func CmdControlPlane(_ cmdutil.Factory) *cobra.Command {
2020
Hidden: true,
2121
Short: "Control-plane is a envoy xds server",
2222
Long: `Control-plane is a envoy xds server, distribute envoy route configuration`,
23-
Run: func(cmd *cobra.Command, args []string) {
23+
RunE: func(cmd *cobra.Command, args []string) error {
2424
util.InitLoggerForServer(config.Debug)
2525
go util.StartupPProf(0)
26-
controlplane.Main(watchDirectoryFilename, port, log.StandardLogger())
26+
err := controlplane.Main(cmd.Context(), watchDirectoryFilename, port, log.StandardLogger())
27+
return err
2728
},
2829
}
2930
cmd.Flags().StringVarP(&watchDirectoryFilename, "watchDirectoryFilename", "w", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files")

pkg/controlplane/main.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
log "github.com/sirupsen/logrus"
1111
)
1212

13-
func Main(filename string, port uint, logger *log.Logger) {
13+
func Main(ctx context.Context, filename string, port uint, logger *log.Logger) error {
1414
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
1515
proc := NewProcessor(snapshotCache, logger)
1616

17+
errChan := make(chan error, 2)
18+
1719
go func() {
18-
ctx := context.Background()
1920
server := serverv3.NewServer(ctx, snapshotCache, nil)
20-
RunServer(ctx, server, port)
21+
errChan <- RunServer(ctx, server, port)
2122
}()
2223

2324
notifyCh := make(chan NotifyMessage, 100)
@@ -29,20 +30,29 @@ func Main(filename string, port uint, logger *log.Logger) {
2930

3031
watcher, err := fsnotify.NewWatcher()
3132
if err != nil {
32-
log.Fatal(fmt.Errorf("failed to create file watcher, err: %v", err))
33+
return fmt.Errorf("failed to create file watcher: %v", err)
3334
}
3435
defer watcher.Close()
35-
if err = watcher.Add(filename); err != nil {
36-
log.Fatal(fmt.Errorf("failed to add file: %s to wather, err: %v", filename, err))
36+
err = watcher.Add(filename)
37+
if err != nil {
38+
return fmt.Errorf("failed to add file: %s to wather: %v", filename, err)
3739
}
3840
go func() {
39-
log.Fatal(Watch(watcher, filename, notifyCh))
41+
errChan <- Watch(watcher, filename, notifyCh)
4042
}()
4143

4244
for {
4345
select {
4446
case msg := <-notifyCh:
45-
proc.ProcessFile(msg)
47+
err = proc.ProcessFile(msg)
48+
if err != nil {
49+
log.Errorf("failed to process file: %v", err)
50+
return err
51+
}
52+
case err = <-errChan:
53+
return err
54+
case <-ctx.Done():
55+
return ctx.Err()
4656
}
4757
}
4858
}

pkg/controlplane/processor.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ func (p *Processor) newVersion() string {
4444
return strconv.FormatInt(p.version, 10)
4545
}
4646

47-
func (p *Processor) ProcessFile(file NotifyMessage) {
47+
func (p *Processor) ProcessFile(file NotifyMessage) error {
4848
configList, err := ParseYaml(file.FilePath)
4949
if err != nil {
5050
p.logger.Errorf("error parsing yaml file: %+v", err)
51-
return
51+
return err
5252
}
5353
for _, config := range configList {
5454
if len(config.Uid) == 0 {
@@ -76,21 +76,22 @@ func (p *Processor) ProcessFile(file NotifyMessage) {
7676

7777
if err != nil {
7878
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
79-
return
79+
return err
8080
}
8181

8282
if err = snapshot.Consistent(); err != nil {
8383
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
84-
return
84+
return err
8585
}
8686
p.logger.Debugf("will serve snapshot %+v, nodeID: %s", snapshot, config.Uid)
8787
if err = p.cache.SetSnapshot(context.Background(), config.Uid, snapshot); err != nil {
8888
p.logger.Errorf("snapshot error %q for %v", err, snapshot)
89-
p.logger.Fatal(err)
89+
return err
9090
}
9191

9292
p.expireCache.Set(config.Uid, config, time.Minute*5)
9393
}
94+
return nil
9495
}
9596

9697
func ParseYaml(file string) ([]*Virtual, error) {

pkg/controlplane/server.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ const (
2121
grpcMaxConcurrentStreams = 1000000
2222
)
2323

24-
func RunServer(ctx context.Context, server serverv3.Server, port uint) {
24+
func RunServer(ctx context.Context, server serverv3.Server, port uint) error {
2525
grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
2626

2727
var lc net.ListenConfig
2828
listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", port))
2929
if err != nil {
30-
log.Fatal(err)
30+
return err
3131
}
3232

3333
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
@@ -39,7 +39,5 @@ func RunServer(ctx context.Context, server serverv3.Server, port uint) {
3939
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
4040

4141
log.Infof("management server listening on %d", port)
42-
if err = grpcServer.Serve(listener); err != nil {
43-
log.Fatal(err)
44-
}
42+
return grpcServer.Serve(listener)
4543
}

pkg/core/gvisorudphandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func GvisorUDPListener(addr string) (net.Listener, error) {
125125
if err != nil {
126126
return nil, err
127127
}
128-
return &tcpKeepAliveListener{ln}, nil
128+
return &tcpKeepAliveListener{TCPListener: ln}, nil
129129
}
130130

131131
func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) {

pkg/core/tcp.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TCPListener(addr string) (net.Listener, error) {
2727
if err != nil {
2828
return nil, err
2929
}
30-
return &tcpKeepAliveListener{ln}, nil
30+
return &tcpKeepAliveListener{TCPListener: ln}, nil
3131
}
3232

3333
type tcpKeepAliveListener struct {

pkg/core/tunhandler.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) {
142142
}
143143
}
144144

145-
func (h tunHandler) printRoute() {
146-
for {
145+
func (h *tunHandler) printRoute(ctx context.Context) {
146+
for ctx.Err() == nil {
147147
select {
148148
case <-time.Tick(time.Second * 5):
149149
var i int
@@ -370,7 +370,7 @@ func (d *Device) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem,
370370
}
371371

372372
func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) {
373-
go h.printRoute()
373+
go h.printRoute(ctx)
374374

375375
device := &Device{
376376
tun: tun,

pkg/handler/connect.go

+21-20
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
log "github.com/sirupsen/logrus"
3030
"github.com/spf13/pflag"
3131
"golang.org/x/crypto/ssh"
32-
"golang.org/x/sync/errgroup"
3332
"google.golang.org/grpc/metadata"
3433
v1 "k8s.io/api/core/v1"
3534
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -684,29 +683,31 @@ func (c *ConnectOptions) setupDNS(ctx context.Context, lite bool) error {
684683
}
685684

686685
func Run(ctx context.Context, servers []core.Server) error {
687-
group, ctx := errgroup.WithContext(ctx)
686+
errChan := make(chan error, len(servers))
688687
for i := range servers {
689-
i := i
690-
group.Go(func() error {
691-
l := servers[i].Listener
692-
defer l.Close()
693-
for {
694-
select {
695-
case <-ctx.Done():
696-
return ctx.Err()
697-
default:
688+
go func(i int) {
689+
errChan <- func() error {
690+
svr := servers[i]
691+
defer svr.Listener.Close()
692+
for ctx.Err() == nil {
693+
conn, err := svr.Listener.Accept()
694+
if err != nil {
695+
log.Debugf("server accept connect error: %v", err)
696+
return err
697+
}
698+
go svr.Handler.Handle(ctx, conn)
698699
}
700+
return ctx.Err()
701+
}()
702+
}(i)
703+
}
699704

700-
conn, errs := l.Accept()
701-
if errs != nil {
702-
log.Debugf("server accept connect error: %v", errs)
703-
continue
704-
}
705-
go servers[i].Handler.Handle(ctx, conn)
706-
}
707-
})
705+
select {
706+
case err := <-errChan:
707+
return err
708+
case <-ctx.Done():
709+
return ctx.Err()
708710
}
709-
return group.Wait()
710711
}
711712

712713
func Parse(r core.Route) ([]core.Server, error) {

pkg/handler/tools.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ func Complete(ctx context.Context, route *core.Route) error {
3131
if err != nil {
3232
return err
3333
}
34-
resp, err := client.RentIP(context.Background(), &rpc.RentIPRequest{
34+
var resp *rpc.RentIPResponse
35+
resp, err = client.RentIP(context.Background(), &rpc.RentIPRequest{
3536
PodName: os.Getenv(config.EnvPodName),
3637
PodNamespace: ns,
3738
})
@@ -44,6 +45,8 @@ func Complete(ctx context.Context, route *core.Route) error {
4445
err := release(context.Background(), client)
4546
if err != nil {
4647
log.Errorf("release ip failed: %v", err)
48+
} else {
49+
log.Errorf("release ip secuess")
4750
}
4851
}()
4952

@@ -57,7 +60,8 @@ func Complete(ctx context.Context, route *core.Route) error {
5760
return err
5861
}
5962
for i := 0; i < len(route.ServeNodes); i++ {
60-
node, err := core.ParseNode(route.ServeNodes[i])
63+
var node *core.Node
64+
node, err = core.ParseNode(route.ServeNodes[i])
6165
if err != nil {
6266
return err
6367
}

0 commit comments

Comments
 (0)