OpenStack Neutron Server 启动过程

2020/08/03 OpenStack Neutron

1. neutron server 启动过程

pike 版本

neutron-server 入口:setup.cfgneutron-server = neutron.cmd.eventlet.server:main

  1. neutron\cmd\eventlet\server\__init__.py
def main():
    server.boot_server(wsgi_eventlet.eventlet_wsgi_server)


def main_rpc_eventlet():
    server.boot_server(rpc_eventlet.eventlet_rpc_server)

boot_server() 方法在 neutron.server.__init__.py 中,该函数主要是加载配置文件 _init_configuration() ;调用 server_func() 即: wsgi_eventlet.eventlet_wsgi_server

  1. neutron\server\__init__.py:
def _init_configuration():
    # the configuration will be read into the cfg.CONF global data structure
    config.init(sys.argv[1:])
    config.setup_logging()
    config.set_config_defaults()
    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the '--config-file' option!"))


def boot_server(server_func):
    _init_configuration()
    try:
        server_func()
    except KeyboardInterrupt:
        pass
    except RuntimeError as e:
        sys.exit(_("ERROR: %s") % e)


def get_application():
    _init_configuration()
    profiler.setup('neutron-server', cfg.CONF.host)
    return config.load_paste_app('neutron')

eventlet_wsgi_server() 函数在 neutron\server\wsgi_eventlet.py 文件中,该方法中一部分是 WSGI 一部分是 rpc,将 neutron api 封装成 class NeutronApiService

  1. neutron\server\wsgi_eventlet.py
def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)


def start_api_and_rpc_workers(neutron_api):
    try:
        worker_launcher = service.start_all_workers()

        pool = eventlet.GreenPool()
        api_thread = pool.spawn(neutron_api.wait)
        plugin_workers_thread = pool.spawn(worker_launcher.wait)

        # api and other workers should die together. When one dies,
        # kill the other.
        api_thread.link(lambda gt: plugin_workers_thread.kill())
        plugin_workers_thread.link(lambda gt: api_thread.kill())

        pool.waitall()
    except NotImplementedError:
        LOG.info("RPC was already started in parent process by "
                 "plugin.")

        neutron_api.wait()

serve_wsgi() 函数中调用了 class NeutronApiService 中的 createf方法来创建实例,然后使用start启动服务

def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception('Unrecoverable error: please check log '
                          'for details.')

    registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service)
    return service

NeutronApiService 在文件 neutron\service.py 中,NeutronApiService 是继承了 class WsgiService,说明 neutron server 是一个 WSGI 服务

class WsgiService(object):
    """Base class for WSGI based services.

    For each api you define, you must also define these flags:
    :<api>_listen: The address on which to listen
    :<api>_listen_port: The port on which to listen

    """

    def __init__(self, app_name):
        self.app_name = app_name
        self.wsgi_app = None

    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

    def wait(self):
        self.wsgi_app.wait()

NeutronApiService 只是简单的记录了服务名称,neutronstart 函数里面真真正的加载了 WSGI APP

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error('No known API applications configured.')
        return
    return run_wsgi_app(app)

_run_wsgi() 函数中加载了 paste 定义的 WSGI 应用

def run_wsgi_app(app):
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    LOG.info("Neutron service started, listening on %(host)s:%(port)s",
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server

run_wsgi_app() 该函数会启动一个 neutron server

def load_paste_app(app_name):
    """Builds and returns a WSGI app from a paste config file.

    :param app_name: Name of the application to load
    """
    loader = wsgi.Loader(cfg.CONF)
    app = loader.load_app(app_name)
    return app

wsgi.Loader() 函数从 neutron.conf 中读取deploy配置文件的路径,然后根据文件来加载 app,一般路径为etc/neutron/api-paste.ini 然后使用 deploy.loadapp 来加载 app,这个 deploy就是PasteDeploy

class Loader(object):
    """Used to load WSGI applications from paste configurations."""

    def __init__(self, conf):
        """Initialize the loader, and attempt to find the config.

        :param conf: Application config
        :returns: None

        """
        conf.register_opts(_options.wsgi_opts)
        self.config_path = None

        config_path = conf.api_paste_config
        if not os.path.isabs(config_path):
            self.config_path = conf.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path

        if not self.config_path:
            raise ConfigNotFound(path=config_path)

    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.

        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound

        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {'name': name, 'path': self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)
        except LookupError:
            LOG.exception("Couldn't lookup app: %s", name)
            raise PasteAppNotFound(name=name, path=self.config_path)

/etc/neutron/api-paste.ini

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

2. 通过 paste 和 paste.deploy 加载 WSGI application

paste 配置文件是由一个个 section 构成的,下面对 api-paste.ini 配置文件做一定的解释:

每个 section 格式: [type:name]

type 有以下几种类型:

  • 应用:app,application
  • 过滤器:filter,filte-app
  • 管道:pipeline
  • 组合:composite
[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions
/v2.0: neutronapi_v2_0

[composite:neutron] 是一个 sectiion ,表示这是一个组合类型配置,名字是 neutron ,request 请求进来会先通过第一个 section ,表示要将一个 http request 调度到一个或者多个 application 上

使用 key use 表明将使用 Paste agg 包中的 paste.urlmap 这个中间件功能,这个中间件可以根据不同的 url 请求前缀路由给不同的 WSGI 应用

/ 配置表示将对 / 的访问路由到 neutronversions 这个 app

/v2.0 配置表示将对 /v2.0 的访问路由交给 neutronapi_v2_0 这个 app 处理

use 可以使用以下几种形式

  • egg : 使用一个 URL 指定的 egg 包中的对象
  • call : 使用某个模块中的可调用对象
  • config : 使用另外一个配置文件
[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

使用 key paste.app_factory 表示后面是一个工厂函数,指明了加载的模块和方法

neutron\api\versions.py:

class Versions(object): 

    @classmethod
    def factory(cls, global_config, **local_config):
        if cfg.CONF.web_framework == 'pecan':
            return pecan_app.versions_factory(global_config, **local_config)
        return cls(app=None)

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        """Respond to a request for all Neutron API versions."""
        version_objs = [
            {
                "id": "v2.0",
                "status": "CURRENT",
            },
        ]

        if req.path != '/':
            if self.app:
                return req.get_response(self.app)
            language = req.best_match_language()
            msg = _('Unknown API version specified')
            msg = oslo_i18n.translate(msg, language)
            return webob.exc.HTTPNotFound(explanation=msg)

        builder = versions_view.get_view_builder(req)
        versions = [builder.build(version) for version in version_objs]
        response = dict(versions=versions)
        metadata = {}

        content_type = req.best_match_content_type()
        body = (wsgi.Serializer(metadata=metadata).
                serialize(response, content_type))

        response = webob.Response()
        response.content_type = content_type
        response.body = wsgi.encode_body(body)

        return response

    def __init__(self, app):
        self.app = app

该模块使用了工厂方法来构造对应的 WSGI 应用对象,这样对于访问/的 URL 就会交给 factory 方法构造一个 callable 应用对象,该对象有一个 __call__ 方法,就是在处理发送方请求时调用的方法

这里 @webob.dec.wsgify(RequestClass=wsgi.Request) 使用了装饰器,webob

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

这个 section 用来处理访问 URL /v2.0 访问的 application,use = call:neutron.auth:pipeline_factory,根据 value 判断这是一个 call 类型的值,说明使用一个 callable 对象来构造 WSGI 应用

neutron\auth.py:

class NeutronKeystoneContext(base.ConfigurableMiddleware):
    """Make a request context from keystone headers."""

    @webob.dec.wsgify
    def __call__(self, req):
        ctx = context.Context.from_environ(req.environ)

        if not ctx.user_id:
            LOG.debug("X_USER_ID is not found in request")
            return webob.exc.HTTPUnauthorized()

        # Inject the context...
        req.environ['neutron.context'] = ctx

        return self.application


def pipeline_factory(loader, global_conf, **local_conf):
    """Create a paste pipeline based on the 'auth_strategy' config option."""
    pipeline = local_conf[cfg.CONF.auth_strategy]
    pipeline = pipeline.split()
    filters = [loader.get_filter(n) for n in pipeline[:-1]]
    app = loader.get_app(pipeline[-1])
    filters.reverse()
    for filter in filters:
        app = filter(app)
    return app

该方法会根据不同的配置选择不同的授权策略,noauthkeystone 然后读取不同的 filter 和 app, cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0 这是一个 filter 列表,一般默认为 keystone 所以一般对于访问 URL 为 v2.0 都会对 neutronapiapp_v2_0 这个 app 经过所有的 filter 后返回

# cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0
[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

filterapp 的 key 有两种格式 paste.filter_factorypaste.app_factory , filter_factory 和 app_factory 是非常类似的,filter_factory 返回 filter 而不是 WSGI app ,返回的 filter 必须是 callable , 接受 WSGI app 作为唯一参数,即:返回处理过的 application。

[filter:extensions] –> neutron\api\extensions.py:

def plugin_aware_extension_middleware_factory(global_config, **local_config):
    """Paste factory."""
    def _factory(app):
        ext_mgr = PluginAwareExtensionManager.get_instance()
        return ExtensionMiddleware(app, ext_mgr=ext_mgr)
    return _factory

可以发现 filter 接受一个 app 返回一个处理后的 app

filter_factory 是最常见 factory ,接受配置参数,用来返回一个 WSGI 应用,全局配置参数通过字典形式传入,局部配置参数通过 关键字参数 传入

3. URL 如何路由到具体的 applictaion

对于访问 URL 为 /v2.0 为前缀的请求,最终都交由 neutron.api.v2.router:APIRouter.factory 来负责实例化对象,这个工厂又是如何将不同的具体的请求发送给不同的 application 的呢?这里就需要介绍 routerspecan 这两个库了

paste.app_factory –> neutron\api\v2\router.py :


class APIRouter(base_wsgi.Router):

    @classmethod
    def factory(cls, global_config, **local_config):
        if cfg.CONF.web_framework == 'pecan':
            return pecan_app.v2_factory(global_config, **local_config)
        return cls(**local_config)

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        manager.init()
        plugin = directory.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            resource_registry.register_resource_by_name(resource)

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)
        

可以看到如果配置文件使用 pecan 则会使用 v2_factory() 函数,即:pecan 库来实现 application 的路由,默认则会使用 routers 库实现具体的 applicatiion 进行路由 ,pecan 暂时先不看,对于如何使用 routers 进行路由需要看 __init__() 中具体是如何实现的

__init__() 中可以看到构造 mapper 映射主要是通过内部 _map_resource 函数实现,通过遍历 RESOURCESSUB_RESOURCES 来添加 routers 对于不同的 URL 请求,通过 base.create_resource() 来创建相应的 controller

neutron\api\v2\base.py :

def create_resource(collection, resource, plugin, params, allow_bulk=False,
                    member_actions=None, parent=None, allow_pagination=False,
                    allow_sorting=False):
    controller = Controller(plugin, collection, resource, params, allow_bulk,
                            member_actions=member_actions, parent=parent,
                            allow_pagination=allow_pagination,
                            allow_sorting=allow_sorting)

    return wsgi_resource.Resource(controller, FAULT_MAP)

该方法动态的处理不同的 URL 所需的 controller ,collectionmember_actions 决定了 API 所支持的动作

COLLECTION_ACTIONS = ['index', 'create']
MEMBER_ACTIONS = ['show', 'update', 'delete']

最后介绍一下 webob,该库提供装饰器来将我们的函数包装成 WSGI 应用

neutron\api\versions.py :(对于 / 的路由处理)

	@webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        """Respond to a request for all Neutron API versions."""
        version_objs = [
            {
                "id": "v2.0",
                "status": "CURRENT",
            },
        ]

        if req.path != '/':
            if self.app:
                return req.get_response(self.app)
            language = req.best_match_language()
            msg = _('Unknown API version specified')
            msg = oslo_i18n.translate(msg, language)
            return webob.exc.HTTPNotFound(explanation=msg)

        builder = versions_view.get_view_builder(req)
        versions = [builder.build(version) for version in version_objs]
        response = dict(versions=versions)
        metadata = {}

        content_type = req.best_match_content_type()
        body = (wsgi.Serializer(metadata=metadata).
                serialize(response, content_type))

        response = webob.Response()
        response.content_type = content_type
        response.body = wsgi.encode_body(body)

        return response

@webob.dec.wsgify(RequestClass=wsgi.Request) 其作用是将函数封装成符合 WSGI 规范的 application ,这样调用 __call__ 方法时,就会按照如下方式调用:

app = obj(environ, start_response) ,obj 是一个 Versions 对象

4. 创建 network 流程

上面讲到会根据不同的 URL 动态的创建 controller ,具体的创建由 Resource 实现

neutron\api\v2\resource.py :

def Resource(controller, faults=None, deserializers=None, serializers=None,
             action_status=None):
    """Represents an API entity resource and the associated serialization and
    deserialization logic
    """
    default_deserializers = {'application/json': wsgi.JSONDeserializer()}
    default_serializers = {'application/json': wsgi.JSONDictSerializer()}
    format_types = {'json': 'application/json'}
    action_status = action_status or dict(create=201, delete=204)

    default_deserializers.update(deserializers or {})
    default_serializers.update(serializers or {})

    deserializers = default_deserializers
    serializers = default_serializers
    faults = faults or {}

    @webob.dec.wsgify(RequestClass=Request)
    def resource(request):
        route_args = request.environ.get('wsgiorg.routing_args')
        if route_args:
            args = route_args[1].copy()
        else:
            args = {}

        # NOTE(jkoelker) by now the controller is already found, remove
        #                it from the args if it is in the matchdict
        args.pop('controller', None)
        fmt = args.pop('format', None)
        action = args.pop('action', None)
        content_type = format_types.get(fmt,
                                        request.best_match_content_type())
        language = request.best_match_language()
        deserializer = deserializers.get(content_type)
        serializer = serializers.get(content_type)

        try:
            if request.body:
                args['body'] = deserializer.deserialize(request.body)['body']

            # Routes library is dumb and cuts off everything after last dot (.)
            # as format. At the same time, it doesn't enforce format suffix,
            # which combined makes it impossible to pass a 'id' with dots
            # included (the last section after the last dot is lost). This is
            # important for some API extensions like tags where the id is
            # really a tag name that can contain special characters.
            #
            # To work around the Routes behaviour, we will attach the suffix
            # back to id if it's not one of supported formats (atm json only).
            # This of course won't work for the corner case of a tag name that
            # actually ends with '.json', but there seems to be no better way
            # to tackle it without breaking API backwards compatibility.
            if fmt is not None and fmt not in format_types:
                args['id'] = '.'.join([args['id'], fmt])

            revision_number = api_common.check_request_for_revision_constraint(
                request)
            if revision_number is not None:
                request.context.set_transaction_constraint(
                    controller._collection, args['id'], revision_number)

            method = getattr(controller, action)
            result = method(request=request, **args)
        except Exception as e:
            mapped_exc = api_common.convert_exception_to_http_exc(e, faults,
                                                                  language)
            if hasattr(mapped_exc, 'code') and 400 <= mapped_exc.code < 500:
                LOG.info('%(action)s failed (client error): %(exc)s',
                         {'action': action, 'exc': mapped_exc})
            else:
                LOG.exception('%(action)s failed: %(details)s',
                              {
                                  'action': action,
                                  'details': utils.extract_exc_details(e),
                              }
                              )
            raise mapped_exc

        status = action_status.get(action, 200)
        body = serializer.serialize(result)
        # NOTE(jkoelker) Comply with RFC2616 section 9.7
        if status == 204:
            content_type = ''
            body = None

        return webob.Response(request=request, status=status,
                              content_type=content_type,
                              body=body)
    # NOTE(blogan): this is something that is needed for the transition to
    # pecan.  This will allow the pecan code to have a handle on the controller
    # for an extension so it can reuse the code instead of forcing every
    # extension to rewrite the code for use with pecan.
    setattr(resource, 'controller', controller)
    setattr(resource, 'action_status', action_status)
    return resource

所有的请求都会先交给 resource 进行处理,反序列化和获取请求参数之后,再交由 controller 处理

action = args.pop('action', None)

method = getattr(controller, action)

result = method(request=request, **args)

对于具体的请求 /networks /subnets /subnetpools /ports 最终都会交给对应的 action 函数处理,以 ` create_network ` 为例:

生成对应的 controller 后,会调用 controller 的 create方法

neutron\api\v2\base.py

    def create(self, request, body=None, **kwargs):
        self._notifier.info(request.context,
                            self._resource + '.create.start',
                            body)
        return self._create(request, body, **kwargs)

然后调用 _create 函数:

neutron\api\v2\base.py

	@db_api.retry_db_errors
    def _create(self, request, body, **kwargs):
        """Creates a new instance of the requested entity."""
        parent_id = kwargs.get(self._parent_id_name)
        body = Controller.prepare_request_body(request.context,
                                               body, True,
                                               self._resource, self._attr_info,
                                               allow_bulk=self._allow_bulk)
        action = self._plugin_handlers[self.CREATE]
        # Check authz
        if self._collection in body:
            # Have to account for bulk create
            items = body[self._collection]
        else:
            items = [body]
        # Ensure policy engine is initialized
        policy.init()
        # Store requested resource amounts grouping them by tenant
        # This won't work with multiple resources. However because of the
        # current structure of this controller there will hardly be more than
        # one resource for which reservations are being made
        request_deltas = collections.defaultdict(int)
        for item in items:
            self._validate_network_tenant_ownership(request,
                                                    item[self._resource])
            policy.enforce(request.context,
                           action,
                           item[self._resource],
                           pluralized=self._collection)
            if 'tenant_id' not in item[self._resource]:
                # no tenant_id - no quota check
                continue
            tenant_id = item[self._resource]['tenant_id']
            request_deltas[tenant_id] += 1
        # Quota enforcement
        reservations = []
        try:
            for (tenant, delta) in request_deltas.items():
                reservation = quota.QUOTAS.make_reservation(
                    request.context,
                    tenant,
                    {self._resource: delta},
                    self._plugin)
                reservations.append(reservation)
        except n_exc.QuotaResourceUnknown as e:
            # We don't want to quota this resource
            LOG.debug(e)

        def notify(create_result):
            # Ensure usage trackers for all resources affected by this API
            # operation are marked as dirty
            with db_api.context_manager.writer.using(request.context):
                # Commit the reservation(s)
                for reservation in reservations:
                    quota.QUOTAS.commit_reservation(
                        request.context, reservation.reservation_id)
                resource_registry.set_resources_dirty(request.context)

            notifier_method = self._resource + '.create.end'
            self._notifier.info(request.context,
                                notifier_method,
                                create_result)
            registry.notify(self._resource, events.BEFORE_RESPONSE, self,
                            context=request.context, data=create_result,
                            method_name=notifier_method,
                            collection=self._collection,
                            action=action, original={})
            return create_result

        def do_create(body, bulk=False, emulated=False):
            kwargs = {self._parent_id_name: parent_id} if parent_id else {}
            if bulk and not emulated:
                obj_creator = getattr(self._plugin, "%s_bulk" % action)
            else:
                obj_creator = getattr(self._plugin, action)
            try:
                if emulated:
                    return self._emulate_bulk_create(obj_creator, request,
                                                     body, parent_id)
                else:
                    if self._collection in body:
                        # This is weird but fixing it requires changes to the
                        # plugin interface
                        kwargs.update({self._collection: body})
                    else:
                        kwargs.update({self._resource: body})
                    return obj_creator(request.context, **kwargs)
            except Exception:
                # In case of failure the plugin will always raise an
                # exception. Cancel the reservation
                with excutils.save_and_reraise_exception():
                    for reservation in reservations:
                        quota.QUOTAS.cancel_reservation(
                            request.context, reservation.reservation_id)

        if self._collection in body and self._native_bulk:
            # plugin does atomic bulk create operations
            objs = do_create(body, bulk=True)
            # Use first element of list to discriminate attributes which
            # should be removed because of authZ policies
            fields_to_strip = self._exclude_attributes_by_policy(
                request.context, objs[0])
            return notify({self._collection: [self._filter_attributes(
                obj, fields_to_strip=fields_to_strip)
                for obj in objs]})
        else:
            if self._collection in body:
                # Emulate atomic bulk behavior
                objs = do_create(body, bulk=True, emulated=True)
                return notify({self._collection: objs})
            else:
                obj = do_create(body)
                return notify({self._resource: self._view(request.context,
                                                          obj)})

在该函数内部会从 plugin 里面取出操作映射的 action ,action = self._plugin_handlers[self.CREATE] 这个映射是在 controller 的构造函数中创建的

neutron\api\v2\base.py :

self._plugin_handlers = {
            self.LIST: 'get%s_%s' % (parent_part, self._collection),
            self.SHOW: 'get%s_%s' % (parent_part, self._resource)
        }
 for action in [self.CREATE, self.UPDATE, self.DELETE]:
      self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                         self._resource)

self._resourcenetwork port 这些 resource 时,对应的 create 方法就是 create_network,create_port,从源码可以看到,在 _create() 方法中,调用了 do_create () 方法

neutron\api\v2\base.py :

        def do_create(body, bulk=False, emulated=False):
            kwargs = {self._parent_id_name: parent_id} if parent_id else {}
            if bulk and not emulated:
                obj_creator = getattr(self._plugin, "%s_bulk" % action)
            else:
                obj_creator = getattr(self._plugin, action)
            try:
                if emulated:
                    return self._emulate_bulk_create(obj_creator, request,
                                                     body, parent_id)
                else:
                    if self._collection in body:
                        # This is weird but fixing it requires changes to the
                        # plugin interface
                        kwargs.update({self._collection: body})
                    else:
                        kwargs.update({self._resource: body})
                    return obj_creator(request.context, **kwargs)
            except Exception:
                # In case of failure the plugin will always raise an
                # exception. Cancel the reservation
                with excutils.save_and_reraise_exception():
                    for reservation in reservations:
                        quota.QUOTAS.cancel_reservation(
                            request.context, reservation.reservation_id)

会从 self._plugin 里面获取对应的 action,这个 _plugin 就是核心插件 Ml2Plugin,因此所有的核心操作最终都会交给 Ml2Plugin 里面对应的 create_network ,create_port 等方法执行具体逻辑,即:核心资源的创建、删除 操作最终都是交给 Ml2Plugin 实现

neutron\plugins\ml2\plugin.py :

    def __init__(self):
        # First load drivers, then initialize DB, then initialize drivers
        self.type_manager = managers.TypeManager()
        self.extension_manager = managers.ExtensionManager()
        self.mechanism_manager = managers.MechanismManager()
        super(Ml2Plugin, self).__init__()
        self.type_manager.initialize()
        self.extension_manager.initialize()
        self.mechanism_manager.initialize()
        self._setup_dhcp()
        self._start_rpc_notifiers()
        self.add_agent_status_check_worker(self.agent_health_check)
        self.add_workers(self.mechanism_manager.get_workers())
        self._verify_service_plugins_requirements()
        LOG.info("Modular L2 Plugin initialization complete")

plugin 中首先初始化了 type_manager extension_manager mechanism_manager 这几个管理器,type_managermechanism_manager 分别用来管理 typemechanism , 不同的网络拓扑类型对应着不同的 Type Driver 但是网络的实现机制对应着 Mechanism Driver 。这两个管理器都是通过 stevedor 进行管理的,这样就可以像使用标准库一样来使用 Type 和 Mechanism Driver

其中 Type 插件的加载会以 neutron.ml2.type_drivers 作为命名空间,Mechanism 插件的加载会以 neutron.ml2.mechanism_drivers 作为命名空间,实际上 Ml2Plugin 的不同操作会交给不同的 type ,mechanism 插件处理

neutron\plugins\ml2\plugin.py:

    def create_network(self, context, network):
        self._before_create_network(context, network)
        result, mech_context = self._create_network_db(context, network)
        return self._after_create_network(context, result, mech_context)
    
     def _after_create_network(self, context, result, mech_context):
        kwargs = {'context': context, 'network': result}
        registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
        try:
            self.mechanism_manager.create_network_postcommit(mech_context)
        except ml2_exc.MechanismDriverError:
            with excutils.save_and_reraise_exception():
                LOG.error("mechanism_manager.create_network_postcommit "
                          "failed, deleting network '%s'", result['id'])
                self.delete_network(context, result['id'])

        return result

通过源码可以看到,创建网络最终会交给 mechanism_manager 处理

APIRouter 流程 resource –> Controller –> Ml2Plugin –> Type , Mechanism 这样我们只用实现具体的 type 和 mechanism Driver

Reference1»

Reference2»

Reference3»

Search

    Table of Contents