• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    Python使用Kubernetes API访问集群

    通过将身份认证令牌直接传给 API 服务器,可以避免使用 kubectl 代理,像这样:
    使用 grep/cut 方式:

    # 查看所有的集群,因为你的 .kubeconfig 文件中可能包含多个上下文
    kubectl config view -o jsonpath='{"Cluster name\tServer\n"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"\n"}{end}'
    
    # 从上述命令输出中选择你要与之交互的集群的名称
    export CLUSTER_NAME="some_server_name"
    
    # 指向引用该集群名称的 API 服务器
    APISERVER=$(kubectl config view -o jsonpath="{.clusters[?(@.name==\"$CLUSTER_NAME\")].cluster.server}")
    
    # 获得令牌
    TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='default')].data.token}"|base64 -d)
    
    # 使用令牌玩转 API
    curl -X GET $APISERVER/api --header "Authorization: Bearer $TOKEN" --insecure
    

    客户端库:https://kubernetes.io/zh/docs/reference/using-api/client-libraries/

    python举例:

    目录结构

    配置文件两种方式

    1、将集群中的~/.kube/config,重命名为kubeconfig.yaml
    代码:

    from kubernetes import  client,config
    from kubernetes.stream import stream
    import yaml
    config_file = r"D:\Users\JackHe\PycharmProjects\JJ\k8s\auth\kubeconfig.yaml"
    config.kube_config.load_kube_config(config_file=config_file)
    Api_Instance = client.CoreV1Api()
    Api_Batch = client.BatchV1Api()
    
    #列出所有的namesapce
    for ns in Api_Instance.list_namespace().items:
        print(ns.metadata.name)
    
    #列出所有的nodes
    def list_node():
        api_response = Api_Instance.list_node()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"name": i.metadata.name,
                                    "status": i.status.conditions[-1].type if i.status.conditions[-1].status == "True" else "NotReady",
                                    "ip": i.status.addresses[0].address,
                                    "kubelet_version": i.status.node_info.kubelet_version,
                                    "os_image": i.status.node_info.os_image,
                                     }
        return data
    nodes = list_node()
    print(nodes)
    
    

    2、使用token形式,获取命令上文所示。
    代码:

    # -*- coding: utf-8 -*-
    from kubernetes.client import api_client
    from kubernetes.client.apis import core_v1_api
    from kubernetes import client,config
    
    
    class KubernetesTools(object):
        def __init__(self):
            self.k8s_url = 'https://192.168.1.56:6443'
    
        def get_token(self):
            """
            获取token
            :return:
            """
            with open(r'D:\Users\JackHe\PycharmProjects\JJ\k8s\auth\token', 'r') as file:
                Token = file.read().strip('\n')
                return Token
    
        def get_api(self):
            """
            获取API的CoreV1Api版本对象
            :return:
            """
            configuration = client.Configuration()
            configuration.host = self.k8s_url
            configuration.verify_ssl = False
            configuration.api_key = {"authorization": "Bearer " + self.get_token()}
            client1 = api_client.ApiClient(configuration=configuration)
            api = core_v1_api.CoreV1Api(client1)
            return api
    
        def get_namespace_list(self):
            """
            获取命名空间列表
            :return:
            """
            api = self.get_api()
            namespace_list = []
            for ns in api.list_namespace().items:
                # print(ns.metadata.name)
                namespace_list.append(ns.metadata.name)
    
            return namespace_list
    
        def get_pod_list(self):
           api = self.get_api()
           print("Listing pods with their IPs:")
           ret = api.list_pod_for_all_namespaces(watch=False)
           for i in ret.items:
               print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
    
        def get_service_list(self):
            api = self.get_api()
            ret = api.list_service_for_all_namespaces(watch=False)
            for i in ret.items:
                print("%s \t%s \t%s \t%s \t%s \n" %(i.kind,i.metadata.namespace,i.metadata.name,i.spec.cluster_ip,i.spec.ports))
    
    if __name__ == '__main__':
        namespace_list = KubernetesTools().get_namespace_list()
        pod_list = KubernetesTools().get_pod_list()
        service = KubernetesTools().get_service_list()
        print(namespace_list)
        print(pod_list)
        print(service)

    到此这篇关于Python使用Kubernetes API访问集群的文章就介绍到这了,更多相关Python Kubernetes API访问集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • python3 kubernetes api的使用示例
    上一篇:如何利用pygame实现打飞机小游戏
    下一篇:python必学知识之文件操作(建议收藏)
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    Python使用Kubernetes API访问集群 Python,使用,Kubernetes,API,访问,