在这篇文章中,我们将带领您了解AirflowDAG序列化:TypeError:“V1Pod”类型的对象不是JSON可序列化的的全貌,包括object序列化的相关情况。同时,我们还将为您介绍有关Airf
在这篇文章中,我们将带领您了解Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的的全貌,包括object序列化的相关情况。同时,我们还将为您介绍有关Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG、Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查、API Gmail:renturns错误“字节类型的对象不可JSON序列化”、Flask API TypeError:“ Response”类型的对象不可JSON序列化的知识,以帮助您更好地理解这个主题。
本文目录一览:- Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的(object序列化)
- Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG
- Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查
- API Gmail:renturns错误“字节类型的对象不可JSON序列化”
- Flask API TypeError:“ Response”类型的对象不可JSON序列化
Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的(object序列化)
这是 Airflow 2.0.0 中已修复的错误(请参阅 PR)
Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG
按以下顺序更新后,我们遇到了同样的问题:
- 1.10.12 -> 1.10.14
- 1.10.14 -> 2.0.0
我已经按照他们的指南进行了操作,直到几个小时后调度程序开始崩溃,抱怨在数据库中找不到随机 DAG,我们才遇到任何问题。
我们的部署过程包括清除 /opt/airflow/dags
文件夹并每次都进行全新安装(我们将 dag 和支持代码存储在 python 包中)
因此,在 1.10.x 版本中,我们时不时会遇到调度程序解析空文件夹并从数据库中擦除序列化 dag 的情况,但它始终能够在下次解析时恢复图片
显然在 2.0 中,作为使调度程序 HA 努力的一部分,他们将 DAG 处理器和调度程序完全分离。这会导致竞争条件:
- 如果调度程序作业在 DAG 处理器更新serialized_dag 表值之前命中数据库,它什么也找不到并崩溃
- 如果运气好,上述情况不会发生,您也不会看到此异常
为了解决这个问题,我通过更新数据库中的 is_paused
禁用了所有 DAG 的调度,重新启动了调度程序,一旦它生成了序列化的 dag,就把所有的 dag 重新打开
我在 https://github.com/apache/airflow/pull/13893 中修复了这个问题,该问题将作为 Airflow 2.0.1 的一部分发布。
将于下周(2021 年 2 月 8 日 - 最有可能)发布 Airflow 2.0.1。
,没有足够的代表发表评论所以不得不留下答案,但是:
- 这是全新的 2.0 安装还是旧 1.10.x 实例的升级?和
- 你在回收名称吗?
我真的只是遇到了这个问题(我发现这个问题在谷歌上搜索,看看还有谁在同一条船上)。
就我而言,它是升级的现有 1.10.x 安装,虽然 dag 是动态生成的,但名称已被回收。我在 GUI 中单击 dag 时出错并且它正在杀死调度程序。
Turns Out(TM),使用 GUI 概览中的“垃圾桶”按钮完全删除 dags 并让它们重新生成修复它(如,问题立即消失并且在过去 30 分钟内没有再次出现)。
在我看来,我觉得动态 dag 的某些方面可能没有在 db upgrade
步骤中正确迁移,将它们清除并让它们完全重新生成可以解决问题。显然,您会丢失所有历史记录等,但(至少在我的情况下)这不一定是什么大问题。
Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查
当使用 rabbitmq 作为 airflow 的 broker 的时候,启动 scheduler,即执行 airflow scheduler 命令的时候抛出以下异常:
Traceback (most recent call last):
File "/anaconda/anaconda3/bin/airflow", line 27, in <module>
args.func(args)
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 818, in scheduler
......
......
File "/anaconda/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
return fun(*args, **kwargs)
File "/anaconda/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/anaconda/anaconda3/lib/python3.6/site-packages/librabbitmq/__init__.py", line 122, in basic_publish
mandatory or False, immediate or False,
TypeError: an integer is required (got type NoneType)
整体环境描述:
python3.6 + apache-airflow1.9.0 + rabbitmq 3.6
因为使用 redis 作为 broker 是可以正常运行的,但是换成 rabbitmq 之后就出现了这种情况。尝试过对 rabbitmq 降版本,对 airflow 降低版本,发现依然无解,说明并不是软件版本兼容问题。
于是进一步排查,单独使用 celery4.x 进行调试,发现 celery 可以正常运行,但是到了 airflow 下就出现问题,说明是配置问题。
但是 airflow 的官方文档中配置是相当简陋的,而源码中相关的配置又相当的多,实在无法定位。于是采用最粗暴的做法,调试,但是服务器无法远程,没有外网端口,不能远程调试,而且 airflow 并不支持 windows 平台。于是只能通过日志调试:
首先,根据异常提示,说明有配置属性为空导致了这个异常,于是在异常处加上打印日志:
$ vi /anaconda/anaconda3/lib/python3.6/site-packages/librabbitmq/__init__.py
......
......
if isinstance(body, tuple):
body, properties = body
elif isinstance(body, self.Message):
body, properties = body.body, body.properties
print("---------------------------------------")
print(self.channel_id)
print("---------------------------------------")
print(body)
print("---------------------------------------")
print(exchange)
print("---------------------------------------")
print(routing_key)
print("---------------------------------------")
print(properties)
print("---------------------------------------")
print(mandatory)
print("---------------------------------------")
print(immediate)
print("---------------------------------------")
#if properties["priority"] is None: 加上这一句后就不会抛出异常了
# properties["priority"] = 0
return self.connection._basic_publish(
self.channel_id, body, exchange, routing_key, properties,
mandatory or False, immediate or False,
)
......
......
打印结果如下:
---------------------------------------
1
---------------------------------------
b''\x80\x02}q\x00(X\x04\x00\x00\x00taskq\x01X1\x00\x00\x00airflow.executors.celery_executor.execute_commandq\x02X\x02\x00\x00\x00idq\x03X$\x00\x00\x0077410b3a-75c6-4ba0-a448-7048c029e80cq\x04X\x04\x00\x00\x00argsq\x05]q\x06X\xcc\x00\x00\x00airflow run example_passing_params_via_test_command run_this 2018-07-02T01:04:00 --local -sd /anaconda/anaconda3/lib/python3.6/site-packages/airflow/example_dags/example_passing_params_via_test_command.pyq\x07aX\x06\x00\x00\x00kwargsq\x08}q\tX\x07\x00\x00\x00retriesq\nK\x00X\x03\x00\x00\x00etaq\x0bNX\x07\x00\x00\x00expiresq\x0cNX\x03\x00\x00\x00utcq\r\x88X\t\x00\x00\x00callbacksq\x0eNX\x08\x00\x00\x00errbacksq\x0fNX\t\x00\x00\x00timelimitq\x10NN\x86q\x11X\x07\x00\x00\x00tasksetq\x12NX\x05\x00\x00\x00chordq\x13Nu.''
---------------------------------------
default
---------------------------------------
celery
---------------------------------------
{''reply_to'': ''d0552f0c-b341-30b6-9edb-fa9599715d6c'', ''correlation_id'': ''77410b3a-75c6-4ba0-a448-7048c029e80c'', ''delivery_mode'': 2, ''content_type'': ''application/x-python-serialize'', ''content_encoding'': ''binary'', ''headers'': {}, ''priority'': None}
---------------------------------------
None
---------------------------------------
None
---------------------------------------
因为 mandatory 和 immediate 是 bool 类型,且都尝试过给其设置值,但是依然报错,最后还为 None 的就剩下 priority 属性为 None 了。于是尝试在代码中对其设置一个 int 类型的值,结果再次运行并无异常,说明缺少优先级属性导致了这个问题,于是可以在 priority 属性为 None 的时候给它一个默认值:
......
......
if isinstance(body, tuple):
body, properties = body
elif isinstance(body, self.Message):
body, properties = body.body, body.properties
if properties["priority"] is None: #加上这一句后就不会抛出异常了
properties["priority"] = 0
return self.connection._basic_publish(
self.channel_id, body, exchange, routing_key, properties,
mandatory or False, immediate or False,
)
......
......
后续发现这样修改源码确实不会出现之前的问题,但是会出现
TypeError can''t pickle memoryview objects
之类的异常,后确认是 librabbitmq 2.0.0 + celery 4.x 的兼容性问题,于是选择使用 pyamqp 协议而不是使用默认的 amqp 协议,具体操作就是将 broker_url 改为如下格式:
broker_url = pyamqp://cord:123456@10.55.63.51:5672//
### transport://userid:password@hostname:port/virtual_host
而将 celery_result_backend 改为其他实现,比如 mysql:
celery_result_backend = db+mysql://af:123456@10.55.63.51/airflow
至此问题解决。
总结:
在排查问题的时候需要针对问题深入排查,而不是无根据的臆测,要针对提示去逐步排查,而不是一味无目的去尝试。
虽然该问题已解决,但是推测应该是缺失相关配置导致了这个问题,但是我加上了优先级相关的配置并不起作用,所以暂时通过修改源码修复这个问题。
API Gmail:renturns错误“字节类型的对象不可JSON序列化”
文档中存在一些错误
base64.urlsafe_b64encode(s)需要字节而不是字符串。
替换
{'raw': base64.urlsafe_b64encode(message.as_string())}
使用
{'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}
Flask API TypeError:“ Response”类型的对象不可JSON序列化
我使用Python Flask Restful API遇到问题,并且数据进入Elasticsearch,当我用Postman发布新数据时,问题是:
TypeError:“ Response”类型的对象不可JSON序列化
你能帮助我吗?
模型:
from marshmallow import Schema, fields, validateclass Person(object): def __init__(self,tcno=None,firstname=None,lastname=None,email=None,birthday=None,country=None,gender=None): self.__tcno = tcno self.__firstname = firstname self.__lastname = lastname self.__email = email self.__birthday = birthday self.__country = country self.__gender = gender def __repr__(self): return ''<Person(firstname={self.__firstname!r})>''.format(self=self)class PersonSchema(Schema): tcno = fields.Str(required=True,validate=[validate.Length(min=11, max=11)]) firstname = fields.Str(required=True) lastname = fields.Str(required=True) email = fields.Email(required=True,validate=validate.Email(error="Not a valid email")) birthday = fields.Date(required=True) country = fields.Str() gender = fields.Str()
视图:
from flask import Response, json, request, jsonify, Flaskimport requestsfrom flask_marshmallow import Marshmallowfrom flask_restful import Api, Resourcefrom Person import Person, PersonSchemaapp = Flask(__name__)api = Api(app)ma = Marshmallow(app)class Apici(Resource): def __init__(self): pass def get(self,people_id): url = "http://localhost:9200/people/person/{}".format(people_id) headers = {"Content-type": "application/json"} r = requests.get(url=url, headers=headers) json_data = json.loads(r.text) if json_data[''found''] is False: mesaj = json.dumps({"found": "False"}) resp = Response(mesaj, status=201, mimetype=''application/json'') return resp return json_data["_source"] def post(self,people_id): json_input = request.get_json() person_schema = PersonSchema() person, errors = person_schema.load(json_input) if errors: return jsonify({''errors'': errors}), 422 #result = person_schema(person) url = "http://localhost:9200/people/person/{}".format(people_id) headers = {"Content-type": "application/json"} print(url) r = requests.post(url=url, json=json_input, headers=headers) print(r) json_data = json.loads(r.text) if json_data["result"] is "Updated": message = json.loads({"result": "updated"}) resp = Response(message, status=201, mimetype=''application/json'') return resp message = json.loads({"result": "created"}) resp = Response(message, status=201, mimetype=''application/json'') return resp #jsonify(result.data) def put(self): json_input = request.get_json() person_schema = PersonSchema() person, errors = person_schema.load(json_input) if errors: return jsonify({''errors'': errors}), 422 result = person_schema(person) url = "http://localhost:9200/people/person/{}".format(request.url[-1]) headers = {"Content-type": "application/json"} r = requests.post(url=url, json=json_input, headers=headers) json_data = json.loads(r.text) if json_data["result"] is "Updated": message = json.dumps({"result": "updated"}) resp = Response(message, status=201, mimetype=''application/json'') return resp message = json.dumps({"result": "created"}) resp = Response(message, status=201, mimetype=''application/json'') return resp #jsonify(result.data) def delete(self): url = "http://localhost:9200/people/person/{}".format(request.url[-1]) headers = {"Content-type": "application/json"} r = requests.delete(url=url,headers=headers) json_data = json.loads(r.text) if json_data["result"] == "not_found": message = json.dumps({"result": "not_found"}) return Response(message, status=201, mimetype=''application/json'') message = json.dumps({"result": "deleted"}) resp = Response(message, status=201, mimetype=''application/json'') return respclass ApiciList(Resource): def __init__(self): pass def get(self): url = "http://localhost:9200/people/person/_search" body = {"query": {"match_all": {}}} headers = {"Content-type": "application/json"} r = requests.get(url=url, json=body, headers=headers) json_data = json.loads(r.text) return json_data["hits"]["hits"]api.add_resource(ApiciList, ''/person'')api.add_resource(Apici, ''/person/<string:people_id>'')if __name__ == ''__main__'': app.run(port=5010,debug=True)
错误:
127.0.0.1 - - [08/Jun/2017 11:37:18] "POST /person/1 HTTP/1.1" 500 -Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1997, in __call__ return self.wsgi_app(environ, start_response) File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1985, in wsgi_app response = self.handle_exception(e) File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 271, in error_router return original_handler(e) File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1540, in handle_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 32, in reraise raise value.with_traceback(tb) File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in wsgi_app response = self.full_dispatch_request() File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in full_dispatch_request rv = self.handle_user_exception(e) File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 271, in error_router return original_handler(e) File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in handle_user_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 32, in reraise raise value.with_traceback(tb) File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in full_dispatch_request rv = self.dispatch_request() File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 481, in wrapper return self.make_response(data, code, headers=headers) File "/usr/local/lib/python3.6/dist-packages/flask_restful/__init__.py", line 510, in make_response resp = self.representations[mediatype](data, *args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/flask_restful/representations/json.py", line 20, in output_json dumped = dumps(data, **settings) + "\n" File "/usr/lib/python3.6/json/__init__.py", line 238, in dumps **kw).encode(obj) File "/usr/lib/python3.6/json/encoder.py", line 201, in encode chunks = list(chunks) File "/usr/lib/python3.6/json/encoder.py", line 437, in _iterencode o = _default(o) File "/usr/lib/python3.6/json/encoder.py", line 180, in default o.__class__.__name__)TypeError: Object of type ''Response'' is not JSON serializable
编辑:我发现了问题。它在def post(self,people_id)方法中:
if errors: return jsonify({''errors'': errors}), 422
新队:
if errors: message = json.dumps({''errors'': errors}) return Response(message, status=422, mimetype=''application/json'')
答案1
小编典典这可以简单地通过以下方式完成:
from flask import jsonifydef myMethod(): .... response = jsonify(data) response.status_code = 200 # or 400 or whatever return response
今天关于Airflow DAG 序列化:TypeError:“V1Pod”类型的对象不是 JSON 可序列化的和object序列化的介绍到此结束,谢谢您的阅读,有关Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG、Airflow:TypeError an integer is required (got type NoneType) 一次诡异问题排查、API Gmail:renturns错误“字节类型的对象不可JSON序列化”、Flask API TypeError:“ Response”类型的对象不可JSON序列化等更多相关知识的信息可以在本站进行查询。
本文标签: