GVKun编程网logo

Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的(object序列化)

9

在这篇文章中,我们将带领您了解AirflowDAG序列化:TypeError:“V1Pod”类型的对象不是JSON可序列化的的全貌,包括object序列化的相关情况。同时,我们还将为您介绍有关Airf

在这篇文章中,我们将带领您了解Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的的全貌,包括object序列化的相关情况。同时,我们还将为您介绍有关Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG、Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查、API Gmail:renturns错误“字节类型的对象不可JSON序列化”、Flask API TypeError:“ Response”类型的对象不可JSON序列化的知识,以帮助您更好地理解这个主题。

本文目录一览:

Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的(object序列化)

Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的(object序列化)

这是 Airflow 2.0.0 中已修复的错误(请参阅 PR)

Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG

Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG

按以下顺序更新后,我们遇到了同样的问题:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

我已经按照他们的指南进行了操作,直到几个小时后调度程序开始崩溃,抱怨在数据库中找不到随机 DAG,我们才遇到任何问题。

我们的部署过程包括清除 /opt/airflow/dags 文件夹并每次都进行全新安装(我们将 dag 和支持代码存储在 python 包中)

因此,在 1.10.x 版本中,我们时不时会遇到调度程序解析空文件夹并从数据库中擦除序列化 dag 的情况,但它始终能够在下次解析时恢复图片

显然在 2.0 中,作为使调度程序 HA 努力的一部分,他们将 DAG 处理器和调度程序完全分离。这会导致竞争条件:

  • 如果调度程序作业在 DAG 处理器更新serialized_dag 表值之前命中数据库,它什么也找不到并崩溃
  • 如果运气好,上述情况不会发生,您也不会看到此异常

为了解决这个问题,我通过更新数据库中的 is_paused 禁用了所有 DAG 的调度,重新启动了调度程序,一旦它生成了序列化的 dag,就把所有的 dag 重新打开

,

我在 https://github.com/apache/airflow/pull/13893 中修复了这个问题,该问题将作为 Airflow 2.0.1 的一部分发布。

将于下周(2021 年 2 月 8 日 - 最有可能)发布 Airflow 2.0.1。

,

没有足够的代表发表评论所以不得不留下答案,但是:

  1. 这是全新的 2.0 安装还是旧 1.10.x 实例的升级?和
  2. 你在回收名称吗?

我真的只是遇到了这个问题(我发现这个问题在谷歌上搜索,看看还有谁在同一条船上)。

就我而言,它是升级的现有 1.10.x 安装,虽然 dag 是动态生成的,但名称已被回收。我在 GUI 中单击 dag 时出错并且它正在杀死调度程序。

Turns Out(TM),使用 GUI 概览中的“垃圾桶”按钮完全删除 dags 并让它们重新生成修复它(如,问题立即消失并且在过去 30 分钟内没有再次出现)。

在我看来,我觉得动态 dag 的某些方面可能没有在 db upgrade 步骤中正确迁移,将它们清除并让它们完全重新生成可以解决问题。显然,您会丢失所有历史记录等,但(至少在我的情况下)这不一定是什么大问题。

Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查

Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查

​ 当使用 rabbitmq 作为 airflow 的 broker 的时候,启动 scheduler,即执行 airflow scheduler 命令的时候抛出以下异常:

Traceback (most recent call last):
  File "/anaconda/anaconda3/bin/airflow", line 27, in <module>
    args.func(args)
  File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 818, in scheduler
	......
	......
  File "/anaconda/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/anaconda/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/anaconda/anaconda3/lib/python3.6/site-packages/librabbitmq/__init__.py", line 122, in basic_publish
    mandatory or False, immediate or False,
TypeError: an integer is required (got type NoneType)

整体环境描述:

python3.6 + apache-airflow1.9.0 + rabbitmq 3.6

​ 因为使用 redis 作为 broker 是可以正常运行的,但是换成 rabbitmq 之后就出现了这种情况。尝试过对 rabbitmq 降版本,对 airflow 降低版本,发现依然无解,说明并不是软件版本兼容问题。

​ 于是进一步排查,单独使用 celery4.x 进行调试,发现 celery 可以正常运行,但是到了 airflow 下就出现问题,说明是配置问题。

​ 但是 airflow 的官方文档中配置是相当简陋的,而源码中相关的配置又相当的多,实在无法定位。于是采用最粗暴的做法,调试,但是服务器无法远程,没有外网端口,不能远程调试,而且 airflow 并不支持 windows 平台。于是只能通过日志调试:

首先,根据异常提示,说明有配置属性为空导致了这个异常,于是在异常处加上打印日志:

$ vi /anaconda/anaconda3/lib/python3.6/site-packages/librabbitmq/__init__.py
		......
  		......
		if isinstance(body, tuple):
            body, properties = body
        elif isinstance(body, self.Message):
            body, properties = body.body, body.properties
        print("---------------------------------------")
        print(self.channel_id)
        print("---------------------------------------")
        print(body)
        print("---------------------------------------")
        print(exchange)
        print("---------------------------------------")
        print(routing_key)
        print("---------------------------------------")
        print(properties)
        print("---------------------------------------")
        print(mandatory)
        print("---------------------------------------")
        print(immediate)
        print("---------------------------------------")
        #if properties["priority"] is None:  加上这一句后就不会抛出异常了
        #   properties["priority"] = 0        
        return self.connection._basic_publish(
            self.channel_id, body, exchange, routing_key, properties,
            mandatory or False, immediate or False,
        )
		......
        ......

打印结果如下:

---------------------------------------
1
---------------------------------------
b''\x80\x02}q\x00(X\x04\x00\x00\x00taskq\x01X1\x00\x00\x00airflow.executors.celery_executor.execute_commandq\x02X\x02\x00\x00\x00idq\x03X$\x00\x00\x0077410b3a-75c6-4ba0-a448-7048c029e80cq\x04X\x04\x00\x00\x00argsq\x05]q\x06X\xcc\x00\x00\x00airflow run example_passing_params_via_test_command run_this 2018-07-02T01:04:00 --local -sd /anaconda/anaconda3/lib/python3.6/site-packages/airflow/example_dags/example_passing_params_via_test_command.pyq\x07aX\x06\x00\x00\x00kwargsq\x08}q\tX\x07\x00\x00\x00retriesq\nK\x00X\x03\x00\x00\x00etaq\x0bNX\x07\x00\x00\x00expiresq\x0cNX\x03\x00\x00\x00utcq\r\x88X\t\x00\x00\x00callbacksq\x0eNX\x08\x00\x00\x00errbacksq\x0fNX\t\x00\x00\x00timelimitq\x10NN\x86q\x11X\x07\x00\x00\x00tasksetq\x12NX\x05\x00\x00\x00chordq\x13Nu.''
---------------------------------------
default
---------------------------------------
celery
---------------------------------------
{''reply_to'': ''d0552f0c-b341-30b6-9edb-fa9599715d6c'', ''correlation_id'': ''77410b3a-75c6-4ba0-a448-7048c029e80c'', ''delivery_mode'': 2, ''content_type'': ''application/x-python-serialize'', ''content_encoding'': ''binary'', ''headers'': {}, ''priority'': None}
---------------------------------------
None
---------------------------------------
None
---------------------------------------

因为 mandatory 和 immediate 是 bool 类型,且都尝试过给其设置值,但是依然报错,最后还为 None 的就剩下 priority 属性为 None 了。于是尝试在代码中对其设置一个 int 类型的值,结果再次运行并无异常,说明缺少优先级属性导致了这个问题,于是可以在 priority 属性为 None 的时候给它一个默认值:

		......
  		......
		if isinstance(body, tuple):
            body, properties = body
        elif isinstance(body, self.Message):
            body, properties = body.body, body.properties
        if properties["priority"] is None:  #加上这一句后就不会抛出异常了
           properties["priority"] = 0        
        return self.connection._basic_publish(
            self.channel_id, body, exchange, routing_key, properties,
            mandatory or False, immediate or False,
        )
		......
        ......

后续发现这样修改源码确实不会出现之前的问题,但是会出现

TypeError can''t pickle memoryview objects

之类的异常,后确认是 librabbitmq 2.0.0 + celery 4.x 的兼容性问题,于是选择使用 pyamqp 协议而不是使用默认的 amqp 协议,具体操作就是将 broker_url 改为如下格式:

broker_url = pyamqp://cord:123456@10.55.63.51:5672//
###  transport://userid:password@hostname:port/virtual_host

而将 celery_result_backend 改为其他实现,比如 mysql:

celery_result_backend = db+mysql://af:123456@10.55.63.51/airflow

至此问题解决。

总结:

​ 在排查问题的时候需要针对问题深入排查,而不是无根据的臆测,要针对提示去逐步排查,而不是一味无目的去尝试。

​ 虽然该问题已解决,但是推测应该是缺失相关配置导致了这个问题,但是我加上了优先级相关的配置并不起作用,所以暂时通过修改源码修复这个问题。

API Gmail:renturns错误“字节类型的对象不可JSON序列化”

API Gmail:renturns错误“字节类型的对象不可JSON序列化”

文档中存在一些错误

base64.urlsafe_b64encode(s)需要字节而不是字符串。

替换

{'raw': base64.urlsafe_b64encode(message.as_string())}

使用

{'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}

Flask API TypeError:“ Response”类型的对象不可JSON序列化

Flask API TypeError:“ Response”类型的对象不可JSON序列化

我使用Python Flask Restful API遇到问题,并且数据进入Elasticsearch,当我用Postman发布新数据时,问题是:

TypeError:“ Response”类型的对象不可JSON序列化

你能帮助我吗?

模型:

   from marshmallow import Schema, fields, validateclass Person(object):    def __init__(self,tcno=None,firstname=None,lastname=None,email=None,birthday=None,country=None,gender=None):        self.__tcno = tcno        self.__firstname = firstname        self.__lastname = lastname        self.__email = email        self.__birthday = birthday        self.__country = country        self.__gender = gender    def __repr__(self):        return ''<Person(firstname={self.__firstname!r})>''.format(self=self)class PersonSchema(Schema):    tcno = fields.Str(required=True,validate=[validate.Length(min=11, max=11)])    firstname = fields.Str(required=True)    lastname = fields.Str(required=True)    email = fields.Email(required=True,validate=validate.Email(error="Not a valid email"))    birthday = fields.Date(required=True)    country = fields.Str()    gender = fields.Str()

视图:

from flask import Response, json, request, jsonify, Flaskimport requestsfrom flask_marshmallow import Marshmallowfrom flask_restful import Api, Resourcefrom Person import Person, PersonSchemaapp = Flask(__name__)api = Api(app)ma = Marshmallow(app)class Apici(Resource):    def __init__(self):        pass    def get(self,people_id):        url = "http://localhost:9200/people/person/{}".format(people_id)        headers = {"Content-type": "application/json"}        r = requests.get(url=url, headers=headers)        json_data = json.loads(r.text)        if json_data[''found''] is False:            mesaj = json.dumps({"found": "False"})            resp = Response(mesaj, status=201, mimetype=''application/json'')            return resp        return json_data["_source"]    def post(self,people_id):        json_input = request.get_json()        person_schema = PersonSchema()        person, errors = person_schema.load(json_input)        if errors:            return jsonify({''errors'': errors}), 422        #result = person_schema(person)        url = "http://localhost:9200/people/person/{}".format(people_id)        headers = {"Content-type": "application/json"}        print(url)        r = requests.post(url=url, json=json_input, headers=headers)        print(r)        json_data = json.loads(r.text)        if json_data["result"] is "Updated":            message = json.loads({"result": "updated"})            resp = Response(message, status=201, mimetype=''application/json'')            return resp        message = json.loads({"result": "created"})        resp = Response(message, status=201, mimetype=''application/json'')        return resp #jsonify(result.data)    def put(self):        json_input = request.get_json()        person_schema = PersonSchema()        person, errors = person_schema.load(json_input)        if errors:            return jsonify({''errors'': errors}), 422        result = person_schema(person)        url = "http://localhost:9200/people/person/{}".format(request.url[-1])        headers = {"Content-type": "application/json"}        r = requests.post(url=url, json=json_input, headers=headers)        json_data = json.loads(r.text)        if json_data["result"] is "Updated":            message = json.dumps({"result": "updated"})            resp = Response(message, status=201, mimetype=''application/json'')            return resp        message = json.dumps({"result": "created"})        resp = Response(message, status=201, mimetype=''application/json'')        return resp #jsonify(result.data)    def delete(self):        url = "http://localhost:9200/people/person/{}".format(request.url[-1])        headers = {"Content-type": "application/json"}        r = requests.delete(url=url,headers=headers)        json_data = json.loads(r.text)        if json_data["result"] == "not_found":            message = json.dumps({"result": "not_found"})            return Response(message, status=201, mimetype=''application/json'')        message = json.dumps({"result": "deleted"})        resp = Response(message, status=201, mimetype=''application/json'')        return respclass ApiciList(Resource):    def __init__(self):        pass    def get(self):        url = "http://localhost:9200/people/person/_search"        body = {"query": {"match_all": {}}}        headers = {"Content-type": "application/json"}        r = requests.get(url=url, json=body, headers=headers)        json_data = json.loads(r.text)        return json_data["hits"]["hits"]api.add_resource(ApiciList, ''/person'')api.add_resource(Apici, ''/person/<string:people_id>'')if __name__ == ''__main__'':    app.run(port=5010,debug=True)

错误:

127.0.0.1 - - [08/Jun/2017 11:37:18] "POST /person/1 HTTP/1.1" 500 -Traceback (most recent call last):  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1997, in __call__    return self.wsgi_app(environ, start_response)  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1985, in wsgi_app    response = self.handle_exception(e)  File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 271, in error_router    return original_handler(e)  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1540, in handle_exception    reraise(exc_type, exc_value, tb)  File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 32, in reraise    raise value.with_traceback(tb)  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in wsgi_app    response = self.full_dispatch_request()  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in full_dispatch_request    rv = self.handle_user_exception(e)  File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 271, in error_router    return original_handler(e)  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in handle_user_exception    reraise(exc_type, exc_value, tb)  File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 32, in reraise    raise value.with_traceback(tb)  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in full_dispatch_request    rv = self.dispatch_request()  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in dispatch_request    return self.view_functions[rule.endpoint](**req.view_args)  File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 481, in wrapper    return self.make_response(data, code, headers=headers)  File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 510, in make_response    resp = self.representations[mediatype](data, *args, **kwargs)  File "/usr/local/lib/python3.6/dist-packages/flask_restful/representations/json.py", line 20, in output_json    dumped = dumps(data, **settings) + "\n"  File "/usr/lib/python3.6/json/__init__.py", line 238, in dumps    **kw).encode(obj)  File "/usr/lib/python3.6/json/encoder.py", line 201, in encode    chunks = list(chunks)  File "/usr/lib/python3.6/json/encoder.py", line 437, in _iterencode    o = _default(o)  File "/usr/lib/python3.6/json/encoder.py", line 180, in default    o.__class__.__name__)TypeError: Object of type ''Response'' is not JSON serializable

编辑:我发现了问题。它在def post(self,people_id)方法中:

   if errors:        return jsonify({''errors'': errors}), 422

新队:

if errors:    message = json.dumps({''errors'': errors})    return Response(message, status=422, mimetype=''application/json'')

答案1

小编典典

这可以简单地通过以下方式完成:

from flask import jsonifydef myMethod():    ....    response = jsonify(data)    response.status_code = 200 # or 400 or whatever    return response

今天关于Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的object序列化的介绍到此结束,谢谢您的阅读,有关Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG、Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查、API Gmail:renturns错误“字节类型的对象不可JSON序列化”、Flask API TypeError:“ Response”类型的对象不可JSON序列化等更多相关知识的信息可以在本站进行查询。

本文标签: