76.release0.5-01
kube-apiserver
var (
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
// arrange these text blocks sensibly. Grrr.
port = flag.Int("port", 8080, ""+
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
"set up such that this port is not reachable from outside of the cluster. It is "+
"further assumed that port 443 on the cluster's public address is proxied to this "+
"port. This is performed by nginx in the default setup.")
address = util.IP(net.ParseIP("127.0.0.1"))
publicAddressOverride = flag.String("public_address_override", "", ""+
"Public serving address. Read only port will be opened on this address, "+
"and it is assumed that port 443 at this address will be proxied/redirected "+
"to '-address':'-port'. If blank, the address in the first listed interface "+
"will be used.")
readOnlyPort = flag.Int("read_only_port", 7080, ""+
"The port from which to serve read-only resources. If 0, don't serve on a "+
"read-only address. It is assumed that firewall rules are set up such that "+
"this port is not reachable from outside of the cluster.")
securePort = flag.Int("secure_port", 0, "The port from which to serve HTTPS with authentication and authorization. If 0, don't serve HTTPS ")
tlsCertFile = flag.String("tls_cert_file", "", "File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert).")
tlsPrivateKeyFile = flag.String("tls_private_key_file", "", "File containing x509 private key matching --tls_cert_file.")
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.")
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the secure port of the API server via token authentication.")
authorizationMode = flag.String("authorization_mode", "AlwaysAllow", "Selects how to do authorization on the secure port. One of: "+strings.Join(apiserver.AuthorizationModeChoices, ","))
authorizationPolicyFile = flag.String("authorization_policy_file", "", "File with authorization policy in csv format, used with --authorization_mode=ABAC, on the secure port.")
etcdServerList util.StringList
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.")
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
portalNet util.IPNet // TODO: make this a list
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
kubeletConfig = client.KubeletConfig{
Port: 10250,
EnableHttps: false,
}
)
- port : API Server 的监听端口,默认为 8080。
- address : 默认监听地址,这里设为 localhost (127.0.0.1)。
- publicAddressOverride : 公共服务地址,如果为空,则使用第一个列出的接口地址。
- readOnlyPort : 为只读资源提供服务的端口,默认为 7080。
- securePort : 提供带认证和授权的 HTTPS 服务的端口。默认不提供 HTTPS 服务。
- tlsCertFile & tlsPrivateKeyFile : HTTPS 的证书和私钥文件。
- apiPrefix : 服务器上 API 请求的前缀,默认为 "/api"。
- storageVersion : 存储资源的版本。默认为服务器偏好版本。
- cloudProvider & cloudConfigFile : 云服务提供商及其配置文件。
- healthCheckMinions : 是否对 minion 进行健康检查,默认为 true。
- eventTTL : 事件的保留时间,默认为 48 小时。
- tokenAuthFile : 用于通过 token 认证来保护 API Server 安全端口的文件。
- authorizationMode & authorizationPolicyFile : 认证方式及其策略文件。
- etcdServerList & etcdConfigFile : etcd 服务器列表及其配置文件。
- corsAllowedOriginList : 允许的 CORS 源列表。
- allowPrivileged : 是否允许特权容器。
- portalNet : 分配 portal IPs 的 CIDR 表示的 IP 范围。
- enableLogsSupport : 是否启用日志收集的服务器端点。
- kubeletConfig : kubelet 配置。
flag.Parse()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
verifyPortalFlags()
if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) {
glog.Fatalf("specify either -etcd_servers or -etcd_config")
}
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: *allowPrivileged,
})
cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)
kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}
// TODO: expose same flags as client.BindClientConfigFlags but for a server
clientConfig := &client.Config{
Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Version: *storageVersion,
}
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid server address: %v", err)
}
helper, err := newEtcd(*etcdConfigFile, etcdServerList)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
n := net.IPNet(portalNet)
authorizer, err := apiserver.NewAuthorizerFromAuthorizationConfig(*authorizationMode, *authorizationPolicyFile)
if err != nil {
glog.Fatalf("Invalid Authorization Config: %v", err)
}
config := &master.Config{
Client: client,
Cloud: cloud,
EtcdHelper: helper,
HealthCheckMinions: *healthCheckMinions,
EventTTL: *eventTTL,
KubeletClient: kubeletClient,
PortalNet: &n,
EnableLogsSupport: *enableLogsSupport,
EnableUISupport: true,
APIPrefix: *apiPrefix,
CorsAllowedOriginList: corsAllowedOriginList,
TokenAuthFile: *tokenAuthFile,
ReadOnlyPort: *readOnlyPort,
ReadWritePort: *port,
PublicAddress: *publicAddressOverride,
Authorizer: authorizer,
}
m := master.New(config)
flag.Parse()
: 解析命令行参数。所有之前使用flag
定义的变量现在都会被填充。util.InitLogs()
: 初始化日志系统,使得日志能够被写入。defer util.FlushLogs()
: 确保程序结束前刷新日志。defer
关键字使得此语句在包含它的函数(通常是main
)返回之前执行。verflag.PrintAndExitIfRequested()
: 检查命令行参数是否请求显示版本信息。如果是,则显示版本信息并退出。verifyPortalFlags()
: 验证portalNet
参数是否正确配置。- 验证 etcd 配置:只能提供
etcdConfigFile
或etcdServerList
中的一个,不能都提供或都不提供。 capabilities.Initialize(...)
: 初始化 API Server 的能力,例如是否允许特权容器。cloudprovider.InitCloudProvider(...)
: 初始化云提供商接口。这允许 Kubernetes 与各种云提供商(如 AWS、GCE、Azure 等)交互。- 创建
kubeletClient
: 与 kubelet 交互的客户端。 - 设置
clientConfig
并创建一个新的 Kubernetes 客户端:用于与 API Server 交互。 newEtcd(...)
: 初始化 etcd 的帮助函数,用于数据存储。- 将
portalNet
从自定义类型转换为标准的net.IPNet
。 - 创建一个新的授权器,根据提供的配置确定如何进行 API 访问授权。
- 初始化 master 的配置:这涵盖了 API Server 运行所需的大多数配置。
master.New(config)
: 使用上面的配置创建一个新的 Kubernetes master,它处理所有核心的 API 请求。
clientConfig := &client.Config{
Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Version: *storageVersion,
}
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid server address: %v", err)
}
// New creates a Kubernetes client for the given config. This client works with pods,
// replication controllers and services. It allows operations such as list, get, update
// and delete on these objects. An error is returned if the provided configuration
// is not valid.
func New(c *Config) (*Client, error) {
config := *c
if config.Prefix == "" {
config.Prefix = "/api"
}
client, err := RESTClientFor(&config)
if err != nil {
return nil, err
}
return &Client{client}, nil
}
初始化 client ,我们来看看 RESTClientFor
// RESTClientFor returns a RESTClient that satisfies the requested attributes on a client Config
// object.
func RESTClientFor(config *Config) (*RESTClient, error) {
version := defaultVersionFor(config)
// Set version
versionInterfaces, err := latest.InterfacesFor(version)
if err != nil {
return nil, fmt.Errorf("API version '%s' is not recognized (valid values: %s)", version, strings.Join(latest.Versions, ", "))
}
baseURL, err := defaultServerUrlFor(config)
if err != nil {
return nil, err
}
client := NewRESTClient(baseURL, versionInterfaces.Codec)
transport, err := TransportFor(config)
if err != nil {
return nil, err
}
if transport != http.DefaultTransport {
client.Client = &http.Client{Transport: transport}
}
return client, nil
}
- 这个函数
RESTClientFor
的目的是为了根据给定的Config
返回一个符合要求属性的RESTClient
。让我们深入了解其内部工作:
- 版本确定
首先,函数调用
defaultVersionFor(config)
来确定要使用的 API 版本。这里的defaultVersionFor
似乎是一个辅助函数,它可能会检查config
中是否指定了版本,并可能返回默认版本。 - 检查版本
函数接着使用
latest.InterfacesFor(version)
来检查所选版本是否有效并获取相应的接口。这里的latest
可能是一个全局变量或者包级别的对象,它知道所有支持的 API 版本。如果版本无效,函数会返回一个错误。 - 获取基本 URL
然后,它调用
defaultServerUrlFor(config)
来获取服务器的基本 URL。这个函数可能与我们之前看到的DefaultServerURL
类似,但可能根据Config
中的其他信息进行了更多的处理。 - 创建新的 RESTClient
一旦有了基本的 URL 和正确的编解码器(从
versionInterfaces.Codec
获取),函数就会使用它们创建一个新的RESTClient
。 - 设置传输
接下来,函数使用
TransportFor(config)
来为客户端设置传输。这可能涉及到处理各种配置选项,如代理设置、TLS 证书、身份验证等。 如果得到的传输与默认的 HTTP 传输不同,那么它将为RESTClient
的内部 HTTP 客户端设置这个新的传输。 - 返回客户端
最后,函数返回这个配置好的
RESTClient
。
// defaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor
func defaultServerUrlFor(config *Config) (*url.URL, error) {
version := defaultVersionFor(config)
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
defaultTLS := config.CertFile != "" || config.Insecure
host := config.Host
if host == "" {
host = "localhost"
}
return DefaultServerURL(host, config.Prefix, version, defaultTLS)
}
// DefaultServerURL converts a host, host:port, or URL string to the default base server API path
// to use with a Client at a given API version following the standard conventions for a
// Kubernetes API.
func DefaultServerURL(host, prefix, version string, defaultTLS bool) (*url.URL, error) {
if host == "" {
return nil, fmt.Errorf("host must be a URL or a host:port pair")
}
if version == "" {
return nil, fmt.Errorf("version must be set")
}
base := host
hostURL, err := url.Parse(base)
if err != nil {
return nil, err
}
if hostURL.Scheme == "" {
scheme := "http://"
if defaultTLS {
scheme = "https://"
}
hostURL, err = url.Parse(scheme + base)
if err != nil {
return nil, err
}
if hostURL.Path != "" && hostURL.Path != "/" {
return nil, fmt.Errorf("host must be a URL or a host:port pair: %s", base)
}
}
// If the user specified a URL without a path component (http://server.com), automatically
// append the default prefix
if hostURL.Path == "" {
if prefix == "" {
prefix = "/"
}
hostURL.Path = prefix
}
// Add the version to the end of the path
hostURL.Path = path.Join(hostURL.Path, version)
return hostURL, nil
}
- 检查
host
和version
。如果它们中的任何一个是空的,函数会返回错误。 - 尝试解析
host
到一个 URL。如果它不是一个有效的 URL,url.Parse
会返回错误。 - 如果解析的 URL 没有指定协议(也称为方案),函数将为其选择一个默认的。如果
defaultTLS
是true
,则选择 "https://";否则选择 "http://"。然后再次尝试解析这个带有方案的新 URL。 - 检查解析的 URL 的路径是否存在。如果用户指定了一个没有路径的 URL(例如,http://server.com),则会自动追加默认的前缀。
- 最后,函数会将版本添加到路径的末尾。
- 返回构建的 URL。
一个实际的应用示例: 假设我们有以下参数调用函数:
- host:
example.com:8080
- prefix:
/api
- version:
v1
- defaultTLS:
false
返回的 URL 将是 http://example.com:8080/api/v1
。
// Attributes implements authorizer.Attributes interface.
type Attributes struct {
// TODO: add fields and methods when authorizer.Attributes is completed.
}
// alwaysAllowAuthorizer is an implementation of authorizer.Attributes
// which always says yes to an authorization request.
// It is useful in tests and when using kubernetes in an open manner.
type alwaysAllowAuthorizer struct{}
func (alwaysAllowAuthorizer) Authorize(a authorizer.Attributes) (err error) {
return nil
}
func NewAlwaysAllowAuthorizer() authorizer.Authorizer {
return new(alwaysAllowAuthorizer)
}
// alwaysDenyAuthorizer is an implementation of authorizer.Attributes
// which always says no to an authorization request.
// It is useful in unit tests to force an operation to be forbidden.
type alwaysDenyAuthorizer struct{}
func (alwaysDenyAuthorizer) Authorize(a authorizer.Attributes) (err error) {
return errors.New("Everything is forbidden.")
}
func NewAlwaysDenyAuthorizer() authorizer.Authorizer {
return new(alwaysDenyAuthorizer)
}
const (
ModeAlwaysAllow string = "AlwaysAllow"
ModeAlwaysDeny string = "AlwaysDeny"
ModeABAC string = "ABAC"
)
// Keep this list in sync with constant list above.
var AuthorizationModeChoices = []string{ModeAlwaysAllow, ModeAlwaysDeny, ModeABAC}
// NewAuthorizerFromAuthorizationConfig returns the right sort of authorizer.Authorizer
// based on the authorizationMode xor an error. authorizationMode should be one of AuthorizationModeChoices.
func NewAuthorizerFromAuthorizationConfig(authorizationMode string, authorizationPolicyFile string) (authorizer.Authorizer, error) {
if authorizationPolicyFile != "" && authorizationMode != "ABAC" {
return nil, errors.New("Cannot specify --authorization_policy_file without mode ABAC")
}
// Keep cases in sync with constant list above.
switch authorizationMode {
case ModeAlwaysAllow:
return NewAlwaysAllowAuthorizer(), nil
case ModeAlwaysDeny:
return NewAlwaysDenyAuthorizer(), nil
case ModeABAC:
return abac.NewFromFile(authorizationPolicyFile)
default:
return nil, errors.New("Unknown authorization mode")
}
}
- Attributes 结构 :这是一个实现
authorizer.Attributes
接口的结构,但它目前是空的,并有一个待办事项说明要在完善authorizer.Attributes
接口后添加字段和方法。 - alwaysAllowAuthorizer 结构 :这是
authorizer.Attributes
的一个实现,无论何时都会同意授权请求。这在测试中很有用,也在开放的 Kubernetes 使用场景中很有用。 - alwaysDenyAuthorizer 结构 :与
alwaysAllowAuthorizer
相反,这是另一个实现,它总是拒绝授权请求。这在单元测试中很有用,用于强制某个操作被禁止。 - 授权模式常量 :定义了几个字符串常量来表示授权模式,如 "AlwaysAllow", "AlwaysDeny", 和 "ABAC"。
- NewAuthorizerFromAuthorizationConfig 函数 :这个函数是基于传入的
authorizationMode
和authorizationPolicyFile
参数来返回合适的authorizer.Authorizer
。如果authorizationMode
不在预定义的模式中,函数会返回一个错误。这是一个工厂方法,用于根据所选模式创建合适的授权器。