-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathroute.go
160 lines (152 loc) · 3.53 KB
/
route.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package util
import (
"context"
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
)
func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) {
for _, ns := range nsList {
log.Debugf("List namepsace %s pods", ns)
_, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
podNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list pod to add it to route table")
return
}
for _, ns := range nsList {
log.Debugf("List namepsace %s services", ns)
_, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
svcNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list service to add it to route table")
return
}
return
}
func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(ipStr ...string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
serviceList, err := lister.List(ctx, opts)
if err != nil {
return err
}
var ips []string
for _, service := range serviceList.Items {
ips = append(ips, service.Spec.ClusterIP)
}
err = addRouteFunc(ips...)
if err != nil {
log.Errorf("Failed to add service IP to route table: %v", err)
}
if serviceList.Continue == "" {
return nil
}
opts.Continue = serviceList.Continue
}
}
func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(ipStr ...string) error) error {
defer func() {
if er := recover(); er != nil {
log.Error(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return errors.New("watch service chan done")
}
var svc *v1.Service
svc, ok = e.Object.(*v1.Service)
if !ok {
continue
}
_ = routeFunc(svc.Spec.ClusterIP)
}
}
}
func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(ipStr ...string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
podList, err := lister.List(ctx, opts)
if err != nil {
return err
}
var ips []string
for _, pod := range podList.Items {
if pod.Spec.HostNetwork {
continue
}
ips = append(ips, pod.Status.PodIP)
}
err = addRouteFunc(ips...)
if err != nil {
log.Errorf("Failed to add Pod IP to route table: %v", err)
}
if podList.Continue == "" {
return nil
}
opts.Continue = podList.Continue
}
}
func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(ipStrList ...string) error) error {
defer func() {
if er := recover(); er != nil {
log.Errorln(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return fmt.Errorf("watch pod chan done")
}
var pod *v1.Pod
pod, ok = e.Object.(*v1.Pod)
if !ok {
continue
}
if pod.Spec.HostNetwork {
continue
}
ip := pod.Status.PodIP
_ = addRouteFunc(ip)
}
}
}