• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python中bottle使用实例代码

    模仿学习同事的代码来写的,主要是搞懂python中如何来组织包,如何调用包,如何读取配置文件,连接数据库,设置路由,路由分组。(注:使用的是python3.6)

    整体目录设计如下:

    根据调用层级从上往下来说:

    首先项目根目录下的main.py是整个程序的入口,主要作用启动http服务器,调用分组路由。

    main.py

    import bottle
    from confg.conf import CONF
    from api.user import User
     
    db_url = CONF.db.url
     
    default_app = bottle.default_app()
     
    #相当于分组路由
    default_app.mount("/user", User(db_url, "").app)
     
    app = default_app
     
    if __name__ == '__main__':
        bottle.run(app=app,
                   host="localhost",
                   port="8000")

    接着是controller层,就是api目录。api目录包括service文件夹和api下的文件。(注:一般来说controller层,service层是同级的,本项目其实api下的非service文件都是属于controller层,所以还是同一层的,因为要遵守调用顺序,不然可能会发生循环调用)。

    /api/user.py文件

    import logging
     
    from bottle import request
    #db数据库引擎
    from common.base import DB
    #调用service层
    from api.service.user import UserService
     
    logger = logging.getLogger("arview")
     
     
    class User(DB, UserService):
     
        def __init__(self, *args, **kwargs):
            print(">>> User init begin")
            logging.debug('>>> User init begin')
            super(User, self).__init__(*args, **kwargs)
            self.dispatch()
            logger.debug('>>> User init end')
     
        def create(self, db=None):
            create_body = request.json
            create_data = self.create_user(create_body, db)
            return create_data
     
        def delete(self, db=None):
            delete_body = request.json
            delete_data = self.delete_user(delete_body, db)
            return delete_data
     
        def list(self, db=None):
     
            list_data = self.list_user(db)
            return list_data
        #相当于分组路由
        def dispatch(self):
            self.app.route('/listUser', method='post')(self.list)
            self.app.route('/createUser', method='post')(self.create)
            self.app.route('/deleteUser', method='post')(self.delete)

    /service/user.py

    import time
    #model层
    from db.models.user import UserModel
     
     
    class UserService(object):
        def list_user(self, db):
            user_info_list = db.query(UserModel).all()
            for item in user_info_list:
                print(item.username)
            return user_info_list
     
        def create_user(self, create_body, db):
            user_model = UserModel(
                username=create_body.get("username"),
                password=create_body.get("password"),
                role=create_body.get("role"),
                create_time=time.time()
            )
            db.add(user_model)
            db.commit()
            return "success"
     
        def delete_user(self, delete_body, db):
            db.query(UserModel).filter(UserModel.id == (delete_body["id"])).delete()
            db.commit()
            return delete_body

    然后是dao层也就是数据库操作层(但是明显虽然有dao层但是数据库操作的逻辑已经在service层里了)

    最后是读取配置文件和创建数据库引擎。

    读取配置文件使用的包是oslo_config。

    conf.py

    # coding:utf8
    # from __future__ import print_function
    from oslo_config import cfg
     
    DEFAULT_ARVIEW_DB_NAME = 'ginblog'
    DEFAULT_ARVIEW_DB_USER = 'root'
    DEFAULT_ARVIEW_DB_USER_PASSWORD = '33demysql'
    DEFAULT_ARVIEW_DB_HOST = '81.68.179.136'
    DEFAULT_ARVIEW_DB_PORT = 3306
    DEFAULT_ARVIEW_DB_URL_TEMPLATE = 'mysql+mysqlconnector://{}:{}@' \
    
                                     '{}:{}/{}?charset=utf8'
    DEFAULT_ARVIEW_DB_URL = DEFAULT_ARVIEW_DB_URL_TEMPLATE.format(
        DEFAULT_ARVIEW_DB_USER,
        DEFAULT_ARVIEW_DB_USER_PASSWORD,
        DEFAULT_ARVIEW_DB_HOST,
        DEFAULT_ARVIEW_DB_PORT,
        DEFAULT_ARVIEW_DB_NAME)
     
    # 声明参数选项
    opt_group = cfg.OptGroup('keystone_authtoken')
    mysql_opt_group = cfg.OptGroup('db')
     
    auth_opts = [
        cfg.StrOpt('memcached_servers',
                   default='localhost:11211',
                   choices=("localhost:11211", "0.0.0.0:11211"),
                   help=('localhost local', '0.0.0.0 So listen')
                   ),
     
        cfg.StrOpt('signing_dir',
                   default='/var/cache/cinder',
                   choices=("/var/cache/cinder", "/var/cache/cinder"),
                   ),
    ]
     
    # mysql
    mysql_opts = [
        cfg.StrOpt('url', default=DEFAULT_ARVIEW_DB_URL),
        cfg.StrOpt('Db', default='3mysql'),
        cfg.StrOpt('DbHost', default='381.68.179.136'),
        cfg.StrOpt('DbPort', default='33306'),
        cfg.StrOpt('DbUser', default='3DbUser'),
        cfg.StrOpt('DbPassWord', default='3DbPassWord'),
        cfg.StrOpt('DbName', default='3DbName'),
        cfg.BoolOpt('create', default=False),
        cfg.BoolOpt('commit', default=True),
        cfg.BoolOpt('echo', default=True, help='是否显示回显'),
        cfg.BoolOpt('echo_pool', default=False, help='数据库连接池是否记录 checkouts/checkins操作'),
        cfg.IntOpt('pool_size', default=1000, help='数据库连接池中保持打开的连接数量'),
        cfg.IntOpt('pool_recycle', default=600, help='数据库连接池在连接被创建多久(单位秒)以后回收连接')
    ]
     
    token_opts = [
        cfg.StrOpt('project_domain_name'),
        cfg.StrOpt('project_name'),
    ]
     
    CINDER_OPTS = (auth_opts +
                   token_opts)
    MYSQLCINDER_OPTS = (mysql_opts)
     
    # 注册参数选项
    CONF = cfg.CONF
    # 注册组
    CONF.register_group(opt_group)
    CONF.register_group(mysql_opt_group)
     
    # 将各个选项注册进组里
    CONF.register_opts(CINDER_OPTS, group=opt_group)
    CONF.register_opts(MYSQLCINDER_OPTS, group=mysql_opt_group)
     
    if __name__ == "__main__":
        # 要读取哪个配置文件
        CONF(default_config_files=['cinder.conf'])
        print('mysql Db配置组为%s' % (CONF.db.Db))
        print('mysql DbHost%s' % (CONF.db.DbHost))
        print('mysql DbPort配置组为%s' % (CONF.db.DbPort))
        print('mysql DbUser%s' % (CONF.db.DbUser))

    配置文件cinder.conf

    [db]
    Db = mysql
    DbHost = 81.68.179.136
    DbPort = 3306
    DbUser = root
    DbPassWord = 33demysql
    DbName = ginblog
    create = false
    commit = true
    echo = false
    echo_pool = false
    pool_size = 1000
    pool_recycle =600

    它的使用方法是,先声明参数选项就是(相当于声明组)

    mysql_opt_group = cfg.OptGroup('db'),

    然后声明组内的选项,

    mysql_opts = [
        cfg.StrOpt('url', default=DEFAULT_ARVIEW_DB_URL),
        cfg.StrOpt('Db', default='3mysql'),
        cfg.StrOpt('DbHost', default='381.68.179.136'),
        cfg.StrOpt('DbPort', default='33306'),
        cfg.StrOpt('DbUser', default='3DbUser'),
        cfg.StrOpt('DbPassWord', default='3DbPassWord'),
        cfg.StrOpt('DbName', default='3DbName'),
        cfg.BoolOpt('create', default=False),
        cfg.BoolOpt('commit', default=True),
        cfg.BoolOpt('echo', default=True, help='是否显示回显'),
        cfg.BoolOpt('echo_pool', default=False, help='数据库连接池是否记录 checkouts/checkins操作'),
        cfg.IntOpt('pool_size', default=1000, help='数据库连接池中保持打开的连接数量'),
        cfg.IntOpt('pool_recycle', default=600, help='数据库连接池在连接被创建多久(单位秒)以后回收连接')
    ]

    拼接选项

    MYSQLCINDER_OPTS = (mysql_opts)

    接着注册组,

    CONF.register_group(mysql_opt_group)

    最后将选项注册进组。

    CONF.register_opts(MYSQLCINDER_OPTS, group=mysql_opt_group)

    当然最重要的注册参数选项,我的理解就是暴露句柄。

    # 注册参数选项
    CONF = cfg.CONF

    然后创建数据库引擎

    common/utils/sqlalchemy_util.py

    import logging
    from json import loads as json_loads
     
    from sqlalchemy.engine import create_engine
    from sqlalchemy.pool import QueuePool
    from confg import CONF
     
    SQLALCHEMY_ENGINE_CONTAINER = {}
     
    logger = logging.getLogger("arview")
     
     
    def json_deserializer(s, **kw):
        if isinstance(s, bytes):
            return json_loads(s.decode('utf-8'), **kw)
     
        else:
            return json_loads(s, **kw)
     
     
    def get_sqlalchemy_engine(db_url):
        if db_url not in SQLALCHEMY_ENGINE_CONTAINER:
            engine = create_engine(db_url, echo=CONF.db.echo,
                                   # pool_pre_ping如果值为True,那么每次从连接池中拿连接的时候,都会向数据库发送一个类似
                                   # select 1的测试查询语句来判断服务器是否正常运行。当该连接出现disconnect的情况时,
                                   # 该连接连同pool中的其它连接都会被回收
                                   pool_pre_ping=True,
                                   echo_pool=CONF.db.echo_pool,
                                   pool_size=CONF.db.pool_size,
                                   pool_recycle=CONF.db.pool_recycle,
                                   json_deserializer=json_deserializer,
                                   poolclass=QueuePool)
            logger.info('Create sqlalchemy engine %s', engine)
            SQLALCHEMY_ENGINE_CONTAINER[db_url] = engine
     
        return SQLALCHEMY_ENGINE_CONTAINER[db_url]

    这里引用配置文件的数据,直接引入CONF

    from confg import CONF

    然后使用

    CONF.db.echo_pool

    创建句柄,

     与我之前使用的方法不同的是,这里的数据库引擎不需要在使用的地方引入了,会在main里注册路由分组时,通过plugin插件自动将数据库引擎导入。这也是我有点搞不懂的地方,虽然更方便,但是不知道就很难知道了,问了同事才知道是怎么回事。

    bottle源码

    def install(self, plugin):
        ''' Add a plugin to the list of plugins and prepare it for being
            applied to all routes of this application. A plugin may be a simple
            decorator or an object that implements the :class:`Plugin` API.
        '''

    plugin就是相当与golang的中间件,不过作用范围是全部路由。

    这里创建数据库句柄并使用是一个比较绕的过程。总体思路:

    1.写一个bottle plugin,创建数据库句柄,然后install安装这个plugin。就可以在所有的路由中自动引入这个插件(就是不用在包里在导入db句柄了,bottle会自动导入)。

    /common/base.py 创建plugin并安装

    import logging
    from bottle import Bottle
    from confg.conf import CONF
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    from db.models.base import Base as ApiModelBase
    from common.utils.sqlalchemy_util import get_sqlalchemy_engine
    from bottle_sqlalchemy import SQLAlchemyPlugin
     
    logger = logging.getLogger("arview")
    base = ApiModelBase  # sqlalchemy orm base class
     
     
    class Plugins:
        SQLALCHEMY_PLUGIN = None  # sqlalchemy plugin, global only one instance
        APSCHEDULER_PLUGIN = None  # apsechduler plugin. global only one instance
     
     
    class Base(object):
        def __init__(self, *args, **kwargs):
            logger.debug('>>>> Base init begin')
            self.app = Bottle()
            # self.app.install(SwaggerPlugin(self._type))
     
            logger.debug('>>>> Base init end')
     
     
    class DB(Base):
        def __init__(self, db_url, create=None, commit=None, *args, **kwargs):
            print('db_url:', db_url)
            super(DB, self).__init__(*args, **kwargs)
     
            if create is None:
                create = CONF.db.create
            if commit is None:
                commit = CONF.db.commit
     
            if Plugins.SQLALCHEMY_PLUGIN is None:
                Plugins.SQLALCHEMY_PLUGIN = _create_sqlalchemy_plugin(db_url, create=create, commit=commit)
            self.app.install(Plugins.SQLALCHEMY_PLUGIN)
            logger.debug("Install plugin: sqlalchemy.")
            # if CONF.api.enable_request_interval_plugin:
            #     self.app.install(RequestTimeIntervalPlugin())
            logger.debug('>>>> DB init end')
     
     
    class CommonBase(object):
        def __init__(self):
            self._db = None
     
        @property
        def db(self):
            if not self._db:
                DBURL = "mysql+mysqlconnector://{}:{}@{}:{}/{}?charset=utf8".format(CONF.mysql.DbUser,
                                                                                    CONF.mysql.DbPassWord,
                                                                                    CONF.mysql.DbHost,
                                                                                    CONF.mysql.DbPort,
                                                                                    CONF.mysql.DbName)
                engine = create_engine(DBURL, echo=False)
                self._db = sessionmaker()(bind=engine)
            return self._db
     
        @db.deleter
        def db(self):
            if self._db:
                self._db.commit()
                self._db.close()
                self._db = None
     
     
    def _create_sqlalchemy_plugin(db_url, create, commit):
        """
        创建sqlalchemy插件
        :param db_url:
        :param echo:
        :param create:
        :param commit:
        :return:
        """
     
        logger.debug('>>>> create sqlalchemy plugin begin')
        engine = get_sqlalchemy_engine(db_url)
        plugin = SQLAlchemyPlugin(engine, metadata=ApiModelBase.metadata, create=create, commit=commit, use_kwargs=True)
        logger.debug('>>>> create sqlalchemy plugin %s' % plugin)
        return plugin

    最后使用

    /api/user.py

    import logging
     
    from bottle import request
     
    from common.base import DB
    from api.service.user import UserService
     
    logger = logging.getLogger("arview")
     
     
    class User(DB, UserService):
     
        def __init__(self, *args, **kwargs):
            print(">>> User init begin")
            logging.debug('>>> User init begin')
            super(User, self).__init__(*args, **kwargs)
            self.dispatch()
            logger.debug('>>> User init end')
     
        def create(self, db=None):
            create_body = request.json
            create_data = self.create_user(create_body, db)
            return create_data
     
        def delete(self, db=None):
            delete_body = request.json
            delete_data = self.delete_user(delete_body, db)
            return delete_data
     
        def list(self, db=None):
     
            list_data = self.list_user(db)
            return list_data
     
        def dispatch(self):
            self.app.route('/listUser', method='post')(self.list)
            self.app.route('/createUser', method='post')(self.create)
            self.app.route('/deleteUser', method='post')(self.delete)

    这里的db就不需要导入了,可以直接使用。

    db层
    主要是模型层 /db/model/user.py

    from sqlalchemy import Column, String, Enum, TIMESTAMP, Boolean, Integer, BIGINT, DATETIME
     
    from db.models.base import Base
     
     
    class UserModel(Base):
        __tablename__ = "user"
        id = Column("id", BIGINT, primary_key=True, comment="用户id")
        created_at = Column("created_at", DATETIME, comment="创建时间")
        updated_at = Column("updated_at", DATETIME, comment="更新时间")
        deleted_at = Column("deleted_at", DATETIME, comment="删除时间")
        username = Column("username", String(20), comment="用户名")
        password = Column("password", String(500), comment="密码")
        role = Column("role", BIGINT, comment="角色")
     
        def __init__(self, id, created_at, updated_at, deleted_at, username, password, role):
            self.id = id
            self.created_at = created_at
            self.updated_at = updated_at
            self.deleted_at = deleted_at
            self.username = username
            self.password = password
            self.role = role

    /db/model/base.py

    from datetime import datetime
     
    from sqlalchemy import Column, TIMESTAMP
    from sqlalchemy.ext.declarative import declarative_base
     
     
    # sqlalchemy orm base class
    Base = declarative_base()
     
     
    class TimestampMixin(object):
        """为ORM提供时间戳基类"""
     
        created_at = Column('created_at', TIMESTAMP(True), default=datetime.now,
                            comment=u"创建时间")
        updated_at = Column('updated_at', TIMESTAMP(True), default=datetime.now,
                            onupdate=datetime.now, comment=u"更新时间")

    到此这篇关于python bottle使用实例的文章就介绍到这了,更多相关python bottle使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • Python使用Py2neo创建Neo4j的节点、关系及路径
    • Python使用py2neo操作图数据库neo4j的方法详解
    • python利用文件读写编写一个博客
    • 手把手带你用python爬取小姐姐私房照
    • Python time.time()方法
    • Python接口自动化之接口依赖
    • python使用py2neo查询Neo4j的节点、关系及路径
    上一篇:Python爬虫技术
    下一篇:Django将项目移动到新环境的操作步骤
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python中bottle使用实例代码 python,中,bottle,使用,实例,