64.proxy-01
main
func main() {
flag.Parse()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
// define api config source
glog.Infof("Using api calls to get config %v", clientConfig.Host)
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
config.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress))
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
endpointsConfig.RegisterHandler(loadBalancer)
// Just loop forever for now...
select {}
}
- 解析命令行标志 : 使用
flag.Parse()
解析传递给程序的命令行参数。 - 初始化和刷新日志 : 使用
util.InitLogs()
进行日志初始化,并通过defer util.FlushLogs()
确保在程序退出时将所有挂起的日志写入。 - 打印版本信息 : 如果请求,
verflag.PrintAndExitIfRequested()
将打印版本信息并退出程序。 - 创建服务和端点配置 : 使用
config.NewServiceConfig()
和config.NewEndpointsConfig()
创建服务和端点的配置对象。 - 定义API配置源 : 这个部分使用Kubernetes API客户端来创建一个源,以便从Kubernetes API服务器获取配置信息。
- 创建负载均衡器和代理器 : 通过
proxy.NewLoadBalancerRR()
创建一个基于轮询算法的负载均衡器,然后通过proxy.NewProxier()
创建一个新的代理器实例,用于处理传入的连接。 - 注册处理程序 : 使用
serviceConfig.RegisterHandler(proxier)
和endpointsConfig.RegisterHandler(loadBalancer)
将代理器和负载均衡器与相应的服务和端点配置注册。 - 无限循环 : 最后,
select {}
语句使程序保持活动状态,等待服务和端点的更改。
serviceConfig
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
mux *config.Mux
watcher *config.Watcher
store *serviceStore
}
type Watcher struct {
// Listeners for changes and their lock.
listenerLock sync.RWMutex
listeners []Listener
}
我们可以看到相比 kubelet 的 podConfig ,这里多了个 watcher ,实际上用了订阅设计模式,当配置有变化,会通知到每个 listener,然后listener 调用 Onupdate方法。
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := util.StringSet{}
for _, service := range services {
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.port == service.Port {
continue
}
if exists && info.port != service.Port {
err := proxier.stopProxy(service.ID, info)
if err != nil {
glog.Errorf("error stopping %s: %v", service.ID, err)
}
}
glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port)
sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)
if err != nil {
glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)
continue
}
proxier.setServiceInfo(service.ID, &serviceInfo{
port: service.Port,
protocol: service.Protocol,
active: true,
socket: sock,
timeout: udpIdleTimeout,
})
proxier.startAccepting(service.ID, sock)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
err := proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("error stopping %s: %v", name, err)
}
}
}
}