飞书API(7):MySQL 入库通用版本

一、引入

在上一篇介绍了如何使用 pandas 处理飞书接口返回的数据,并将处理好的数据入库。最终的代码拓展性太差,本篇来探讨下如何使得上一篇的最终代码拓展性更好!
为什么上一篇的代码拓展性太差呢?我总结了几点:

  • 列名是硬编码(写死的)
  • pandas 和 MySQL 数据表的字段名和列名的对应关系也是硬编码
  • MySQL 数据表的字段名的数据类型也是硬编码

再抽象一层,其实就是三个对象:多维表、pandas 的 DataFrame 和MySQL 数据表直接字段和数据类型的映射关系的问题。如果单独维护该映射关系,便可以做到通用性。

在飞书端口,我们可以获取到多维表的列名和对应的数据类型。
在 pandas.DataFrame 端口的字段名,需要提供一个飞书列名的映射关系进行转换,而数据类型,在这一阶段主要是根据后续要入库的数据类型来决定,参考 MySQL 的数据类型,一般使用 pandas 默认的数据类型即可。
在 MySQL 表端口的字段名可以直接引用 pandas.DataFrame 的字段名,也方便数据的插入,而数据类型,由于同一个数据类型的数据是一致的,所以可以直接基于数据类型形成一个通用的映射关系。由于 pandas 的字段名和 MySQL 的字段名保持一致,所以列名映射关系也可以定义为飞书多维表的列名与入库表对应的英文字段名的映射关系。

综上,只需要维护两个数据关系即可:多维表的列名和 pandas.DataFrame 的字段名、多维表的数据类型和 MySQL 的数据类型。而后者基本上又是固定的,所以日常只需要维护前者即可。

分析完需求,接下来开始开发。
本文结构:介绍飞书获取字段 API、优化代码逻辑,后者包含飞书各个数据类型的通用处理方案、自动构建 SQL 和链接分析三个部分。

二、飞书API-列出字段

本次要介绍的 API 叫列出字段,它的功能就是根据 app_token 和 table_id,获取数据表的所有字段。
同样,还是调出 API 调试台,左边依次查找云文档>多维表格>字段>点击列出字段,然后获取 access_token,填写多维表的路径参数,点击开始调试,便可以看到获取到的数据内容。
image.png

每一个列名,都包含“field_id”、“field_name”、“is_primary”、“property”、“type”和“ui_type”这 6 个参数。我们重点取“field_name”和“type”。“field_name”主要用于匹配通过查询数据接口获取的列名,“ui_type”就是前面介绍的飞书数据类型编号,这个编号用于映射 MySQL 的字段的数据类型。

{"field_id": "fld9r37Stu","field_name": "自动编号","is_primary": true,"property": {"auto_serial": {"type": "auto_increment_number"}},"type": 1005,"ui_type": "AutoNumber"
}

点击示例代码>Python Requests,获取相关 Python 代码。
image.png

将代码稍微整理一下,核心字符串,如 access_token、app_token 和 table_id 通过参数进行传递,并将接口返回的字段信息数据提取出来,示例如下:

import requests, json
def get_bitable_fields(tenant_access_token, app_token, table_id, page_size=500):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/fields?page_size={page_size}"payload = ''headers = {'Authorization': f'Bearer {tenant_access_token}'}response = requests.request("GET", url, headers=headers, data=payload)field_infos = response.json().get('data').get('items')print('成功获取飞书字段信息,关联函数:get_bitable_fields。')return field_infostenant_access_token = 'your_access_token'
app_token = 'your_app_token'
table_id = 'your_table_id'
feishu_fields = get_bitable_fields(tenant_access_token, app_token, table_id)

这里的“tenant_access_token”也可以使用前面介绍的自动获取“access_token”的方法来获取该值,再通过参数传递给tenant_access_token()方法。打印结构参考如下:
image.png

三、迭代代码逻辑

3.1 映射关系说明

映射关系主要有两个:飞书多维表的列名与入库表对应的英文字段名的映射关系、飞书数据类型与入库表对应的字段的数据类型。
给定一个列名映射关系参考如下。由于每个表的数据都不一样,所以每次有新的多维表表单需要入库时都需要指定二者的关系。
另外,由于入库字段是依赖该映射关系,也仅仅会将这里所列举的字段入库,所以当你只需要把部分列入库时,也可以仅仅指定需要入库的这部分列,而不用列举所有列。

fields_map = [{'tb_field_name': 'field_text','feishu_field_name': '文本'},{'tb_field_name': 'field_email','feishu_field_name': 'email'},{'tb_field_name': 'field_select','feishu_field_name': '单选'},{'tb_field_name': 'field_mobile','feishu_field_name': '电话号码'},{'tb_field_name': 'field_no','feishu_field_name': '自动编号'},{'tb_field_name': 'field_member1','feishu_field_name': '人员1'},{'tb_field_name': 'field_group1','feishu_field_name': '群组1'},{'tb_field_name': 'field_creator','feishu_field_name': '创建人'},{'tb_field_name': 'field_modifier','feishu_field_name': '修改人'},{'tb_field_name': 'field_member2','feishu_field_name': '人员2'},{'tb_field_name': 'field_group2','feishu_field_name': '群组2'},{'tb_field_name': 'field_url','feishu_field_name': '超链接'},{'tb_field_name': 'field_location','feishu_field_name': '地理位置'},{'tb_field_name': 'field_findnum','feishu_field_name': '查找引用数值'},{'tb_field_name': 'field_numformula','feishu_field_name': '数字公式'},{'tb_field_name': 'field_number','feishu_field_name': '数字'},{'tb_field_name': 'field_progress','feishu_field_name': '进度'},{'tb_field_name': 'field_money','feishu_field_name': '货币'},{'tb_field_name': 'field_Rating','feishu_field_name': '评分'},{'tb_field_name': 'field_bool','feishu_field_name': '复选框'},{'tb_field_name': 'field_date','feishu_field_name': '日期'},{'tb_field_name': 'field_createdtime','feishu_field_name': '创建时间'},{'tb_field_name': 'field_updatedtime','feishu_field_name': '更新时间'},{'tb_field_name': 'field_mulselect','feishu_field_name': '多选'},{'tb_field_name': 'field_singleunion','feishu_field_name': '单向关联'},{'tb_field_name': 'field_doubleunion','feishu_field_name': '双向关联'},{'tb_field_name': 'field_file','feishu_field_name': '附件'}]

给定一个数据类型映射关系,该映射表会固定下来,作为默认映射关系处理所有需要入库的飞书数据。

data_type_map = [{"feishu_type": 1   ,"mysql_type": "varchar(256)"   },{"feishu_type": 2   ,"mysql_type": "FLOAT"          },{"feishu_type": 3   ,"mysql_type": "varchar(256)"   },{"feishu_type": 4   ,"mysql_type": "json"           },{"feishu_type": 5   ,"mysql_type": "datetime"       },{"feishu_type": 7   ,"mysql_type": "BOOL"           },{"feishu_type": 11  ,"mysql_type": "varchar(256)"   },{"feishu_type": 13  ,"mysql_type": "varchar(256)"   },{"feishu_type": 15  ,"mysql_type": "varchar(256)"   },{"feishu_type": 17  ,"mysql_type": "varchar(256)"   },{"feishu_type": 18  ,"mysql_type": "json"           },{"feishu_type": 19  ,"mysql_type": "varchar(256)"   },{"feishu_type": 20  ,"mysql_type": "varchar(256)"   },{"feishu_type": 21  ,"mysql_type": "json"           },{"feishu_type": 22  ,"mysql_type": "varchar(256)"   },{"feishu_type": 23  ,"mysql_type": "varchar(256)"   },{"feishu_type": 1001,"mysql_type": "datetime"       },{"feishu_type": 1002,"mysql_type": "datetime"       },{"feishu_type": 1003,"mysql_type": "varchar(256)"   },{"feishu_type": 1004,"mysql_type": "varchar(256)"   },{"feishu_type": 1005,"mysql_type": "varchar(256)"   }]

3.2 迭代入库代码逻辑

迭代内容主要包含两块:处理各个飞书数据类型编号的列返回的数据和构建建表 SQL。

3.2.1 迭代一:处理飞书各个数据类型的数据

处理数据涉及飞书各个列的数据类型,需要先把飞书列出字段 API 返回的列数据和列名映射关系关联,给每一列数据都加上飞书的数据类型编号,以便根据对应的处理方案进行数据的处理。
取上面 API 的代码和“fields_map”变量,将 API 处理后的“feishu_fields”列表和“fields_map”列表都转为 pandas 的 DataFrame,然后以“fields_map”对应的 DataFrame 为主表左连接“feishu_fields”对应的 DataFrame,类似 MySQL 的“left join”。这里把两个列表转为 DataFrame 的功能封装为函数“merge_list”。最终参考代码如下:

import requests, json 
import pandas as pddef get_tenant_access_token(app_id, app_secret):url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"payload = json.dumps({"app_id": app_id,"app_secret": app_secret})headers = {'Content-Type': 'application/json'}response = requests.request("POST", url, headers=headers, data=payload)# print(response.text)return response.json()['tenant_access_token']def get_bitable_fields(tenant_access_token, app_token, table_id, page_size=500):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/fields?page_size={page_size}"payload = ''headers = {'Authorization': f'Bearer {tenant_access_token}'}response = requests.request("GET", url, headers=headers, data=payload)field_infos = response.json().get('data').get('items')return field_infosapp_id = 'your_app_id'
app_secret = 'your_app_secret'
tenant_access_token = get_tenant_access_token(app_id, app_secret)
app_token = 'your_app_token'
table_id = 'your_table_id'feishu_fields = get_bitable_fields(tenant_access_token, app_token, table_id)fields_map = [{'tb_field_name': 'field_text','feishu_field_name': '文本'},{'tb_field_name': 'field_email','feishu_field_name': 'email'},{'tb_field_name': 'field_select','feishu_field_name': '单选'},{'tb_field_name': 'field_mobile','feishu_field_name': '电话号码'},{'tb_field_name': 'field_no','feishu_field_name': '自动编号'},{'tb_field_name': 'field_member1','feishu_field_name': '人员1'},{'tb_field_name': 'field_group1','feishu_field_name': '群组1'},{'tb_field_name': 'field_creator','feishu_field_name': '创建人'},{'tb_field_name': 'field_modifier','feishu_field_name': '修改人'},{'tb_field_name': 'field_member2','feishu_field_name': '人员2'},{'tb_field_name': 'field_group2','feishu_field_name': '群组2'},{'tb_field_name': 'field_url','feishu_field_name': '超链接'},{'tb_field_name': 'field_location','feishu_field_name': '地理位置'},{'tb_field_name': 'field_findnum','feishu_field_name': '查找引用数值'},{'tb_field_name': 'field_numformula','feishu_field_name': '数字公式'},{'tb_field_name': 'field_number','feishu_field_name': '数字'},{'tb_field_name': 'field_progress','feishu_field_name': '进度'},{'tb_field_name': 'field_money','feishu_field_name': '货币'},{'tb_field_name': 'field_Rating','feishu_field_name': '评分'},{'tb_field_name': 'field_bool','feishu_field_name': '复选框'},{'tb_field_name': 'field_date','feishu_field_name': '日期'},{'tb_field_name': 'field_createdtime','feishu_field_name': '创建时间'},{'tb_field_name': 'field_updatedtime','feishu_field_name': '更新时间'},{'tb_field_name': 'field_mulselect','feishu_field_name': '多选'},{'tb_field_name': 'field_singleunion','feishu_field_name': '单向关联'},{'tb_field_name': 'field_doubleunion','feishu_field_name': '双向关联'},{'tb_field_name': 'field_file','feishu_field_name': '附件'}]# 将两个[{},{}]结构的数据合并
def merge_list(ls_from, ls_join, on=None, left_on=None, right_on=None):df_from = pd.DataFrame(ls_from)df_join = pd.DataFrame(ls_join)if on is not None:df_merge = df_from.merge(df_join, how='left', on=on)else:df_merge = df_from.merge(df_join, how='left', left_on=left_on, right_on=right_on) # , suffixes=('', '_y')return df_merge# 以 fields_map 为准
store_fields_info_df = merge_list(fields_map, feishu_fields, left_on='feishu_field_name', right_on='field_name')
store_fields_info_df

执行结果如下,关键的三个字段名已标红,分别为:“tb_field_name”、“feishu_field_name”和“type”。
image.png

获取到每个字段名和类型之后,接下来开始迭代上一篇讲到的extract_key_fields()函数,将写固定列名的硬代码改为通过飞书数据类型识别的代码,以兼容各个数据类型。
代码如下,使用iterrows()遍历字段信息,然后对字段的数据类型编号进行判断,判断规则如下:

  • 如果是编号 1(文本),取字段值中的“text”文本;
  • 如果是编号 2(数字)、3(单选)、7(复选框)、13(手机号)、1005(自动编号),则直接取字段值返回;
  • 如果是编号 5(日期)、1001(创建日期)、1002(更新日期),取字段值加 8 小时,再转为 datetime 类型;
  • 如果是编号 11(人员)、23(群组)、1003(创建人)、1004(修改人),遍历元素,取字段值中的“name”文本,然后用逗号连接起来;
  • 如果是编号 15(链接),取字段值中的“link”文本;
  • 如果是编号 17(附件),遍历元素,取字段值中的“url”文本;
  • 如果是编号 18(单向关联)、21(双向关联),取字段值中的“link_record_ids”文本;
  • 如果是编号 4(多选)、19(引用)、20(公式),取字段值转为字符串;
  • 如果是编号 22(地理位置),取字段值中的“location”文本;
  • 如果是按钮和流程,由于没有返回值,不做处理,忽略即可。
  • 注意:入库如果是 json 类型必须使用json.dumps()进行格式化,否则写入数据库是会报错。
import datetime
def extract_key_fields(feishu_datas, store_fields_info_df):"""处理飞书数据类型编号的数据"""print('开始处理飞书多维表关键字段数据...')# 需要record_id 和 订单号,用于和数据库数据匹配df_feishu = pd.DataFrame(feishu_datas)df_return = pd.DataFrame()#根据列的数据类型,分别处理对应的数据。注意:仅返回以下列举的数据类型,如果fields_map的内容包含按钮、流程等数据类型的飞书列,忽略。for index, row in store_fields_info_df.iterrows():if row['type'] == 1:       #文本df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],[{}])[0].get('text'))elif row['type'] in (2, 3, 7, 13, 1005):  #数字、单选、复选框、手机号、自动编号df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name']))elif row['type'] in (5, 1001, 1002):         #日期(包含创建和更新),需要加 8 小时,即 8*60*60*1000=28800 秒df_return[row['tb_field_name']] = pd.to_datetime(df_feishu['fields'].apply(lambda x:28800 + int(x.get(row['field_name'],1000)/1000)), unit='s')elif row['type'] in(11, 23, 1003, 1004):       #人员、群组、创建人、修改人,遍历取namedf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x: ','.join([i.get('name') for i in x.get(row['field_name'],[{"name":""}])]))  # 需要遍历elif row['type'] == 15:    #链接df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link'))elif row['type'] == 17:    #附件,遍历取urldf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps([i.get('url') for i in x.get(row['field_name'],[{}])])) #需要遍历elif row['type'] in(18, 21):    #单向关联、双向关联,取link_record_idsdf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'],{}).get('link_record_ids')))elif row['type'] in(4, 19, 20):    #多选、查找引用和公式df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'])))elif row['type'] == 22:    #地理位置df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('location'))#加上record_iddf_return['record_id'] = df_feishu.record_id#加上表更新字段df_return['last_modified_time'] = datetime.datetime.now()return df_return# 已知:feishu_datas 和 store_fields_info_df
# feishu_datas 通过 API 获取到的所有飞书的数据, store_fields_info_df 上一小节得到的变量
feishu_df = extract_key_fields(feishu_datas, store_fields_info_df)
feishu_df

最终结果如下,由于列数太多,此处分成三段展示:
image.png

由于数据类型相对比较稳定,所以该逻辑作为固定的处理逻辑写入代码。如果临时需要调整,可以通过定制化处理,下一篇介绍。

3.2.2 迭代二:构建建表 SQL

构建建表 SQL 涉及数据类型映射关系,需要先把该关系和列名映射关系关联,给每一个入库的字段名加上 MySQL 的数据类型。
借用3.2.1 中的merge_list()函数,以“store_fields_info_df”为主表,左连接匹配“data_type_map”中的“mysql_type”。

data_type_map = [{"feishu_type": 1   ,"mysql_type": "varchar(256)"   },{"feishu_type": 2   ,"mysql_type": "FLOAT"          },{"feishu_type": 3   ,"mysql_type": "varchar(256)"   },{"feishu_type": 4   ,"mysql_type": "json"           },{"feishu_type": 5   ,"mysql_type": "datetime"       },{"feishu_type": 7   ,"mysql_type": "BOOL"           },{"feishu_type": 11  ,"mysql_type": "varchar(256)"   },{"feishu_type": 13  ,"mysql_type": "varchar(256)"   },{"feishu_type": 15  ,"mysql_type": "varchar(256)"   },{"feishu_type": 17  ,"mysql_type": "varchar(256)"   },{"feishu_type": 18  ,"mysql_type": "json"           },{"feishu_type": 19  ,"mysql_type": "varchar(256)"   },{"feishu_type": 20  ,"mysql_type": "varchar(256)"   },{"feishu_type": 21  ,"mysql_type": "json"           },{"feishu_type": 22  ,"mysql_type": "varchar(256)"   },{"feishu_type": 23  ,"mysql_type": "varchar(256)"   },{"feishu_type": 1001,"mysql_type": "datetime"       },{"feishu_type": 1002,"mysql_type": "datetime"       },{"feishu_type": 1003,"mysql_type": "varchar(256)"   },{"feishu_type": 1004,"mysql_type": "varchar(256)"   },{"feishu_type": 1005,"mysql_type": "varchar(256)"   }]store_fields_info_df = merge_list(store_fields_info_df, data_type_map, left_on='type', right_on='feishu_type')
store_fields_info_df

结果如下:
image.png

接下来,根据“store_fields_info_df”和表名生成建表语句,参考如下:

def generate_create_ddl(db_table_name, store_fields_info_df):"""生成DDL还需要另外去建表,直接一条龙服务,使用Pyodps建表,并写入数据。下面代码二"""# 代码一:生成DDL语句cre_ddl = f"CREATE TABLE for_ods.{db_table_name} (\n"for index, row in store_fields_info_df.iterrows():cre_ddl += f"    {row['tb_field_name']} {row['mysql_type']} comment '{row['feishu_field_name']}',\n"default_fields = "    record_id varchar(20) comment '行record_id',\n    last_modified_time datetime comment '数据更新时间' \n);"# cre_ddl = cre_ddl[:-2] + "\n);"cre_ddl = cre_ddl + default_fields# print(cre_ddl)return cre_ddl#已知:store_fields_info_df
db_table_name = 'feishu_data_type_test'
cre_ddl = generate_create_ddl(db_table_name, store_fields_info_df)
cre_ddl

执行结果如下:
image.png

看看打印的效果:
image.png

3.2.3 迭代三:传递链接自动提取 app_token、table_id、view_id

迭代完上面两个之后,便可以替换上一篇代码中的extract_key_fields()方法,使得代码拓展性更好!
不过,每次要入库一个多维表都需要手动填写表单的 app_token 和 table_id,为了使得操作更加便捷,通过传递链接,然后从链接中解析对应的字段来完成这两个字段的自动填充。
另外,实际生产中,一个多维表中可能会有多个视图,每个视图可能都会做不同的筛选,以便查看不同维度或颗粒度的数据,所以,通常,我们需要指定视图 ID,避免因为数据被筛选之后取不到完整数据导致数据一致性出问题。同样,也通过链接解析出视图 ID,然后传递给接口。
解析参考代码如下:

from urllib.parse import urlparse, parse_qsdef get_table_params(bitable_url):# bitable_url = "https://feishu.cn/base/aaaaaaaa?table=tblccc&view=vewddd"parsed_url = urlparse(bitable_url)              #解析url:(ParseResult(scheme='https', netloc='feishu.cn', path='/base/aaaaaaaa', params='', query='table=tblccc&view=vewddd', fragment='')query_params = parse_qs(parsed_url.query)       #解析url参数:{'table': ['tblccc'], 'view': ['vewddd']}app_token = parsed_url.path.split('/')[-1]table_id = query_params.get('table', [None])[0]view_id = query_params.get('view', [None])[0]return app_token, table_id, view_idbitable_url = "https://xxxx.feishu.cn/base/PtRdbPjCFa5Og5sry0lcD1yPnKg?table=tblcc5oozF4EOBOE&view=vewVaEFMO6"
app_token, table_id, view_id = get_table_params(bitable_url)
app_token, table_id, view_id

执行结果如下:
image.png

由于上一篇的代码中没有涉及“view_id”,所以需要迭代相关的函数,目前相关的函数有两个:get_bitable_datas()。修改的地方,主要有两个,一个是函数的参数传递,一个是接口的参数传递。前者直接加个参数,保证调用的时候位置对齐即可。后者则是修改下“payload”,修改后的内容参考如下:

def get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search?page_size={page_size}&page_token={page_token}&user_id_type=user_id"payload = json.dumps({"view_id": view_id})headers = {'Content-Type': 'application/json','Authorization': f'Bearer {tenant_access_token}'}response = requests.request("POST", url, headers=headers, data=payload)# print(response.text)return response.json()

3.3 整合通用版代码

将以上的代码组合到一起,并局部调整顺序,完整代码参考如下,几点说明:

  • 循环获取飞书多维表每一页的数据的代码逻辑单独提取出来封装到新函数get_all_bitable_datas()中;
  • 函数merge_list()不仅仅只是支持两个列表通过 DataFrame 合并在一起,也可以是一个 DataFrame 和一个列表

import requests
import json
import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import urlparse, parse_qsdef get_table_params(bitable_url):# bitable_url = "https://feishu.cn/base/aaaaaaaa?table=tblccc&view=vewddd"parsed_url = urlparse(bitable_url)              #解析url:(ParseResult(scheme='https', netloc='feishu.cn', path='/base/aaaaaaaa', params='', query='table=tblccc&view=vewddd', fragment='')query_params = parse_qs(parsed_url.query)       #解析url参数:{'table': ['tblccc'], 'view': ['vewddd']}app_token = parsed_url.path.split('/')[-1]table_id = query_params.get('table', [None])[0]view_id = query_params.get('view', [None])[0]print(f'成功解析链接,app_token:{app_token},table_id:{table_id},view_id:{view_id}。关联方法:get_table_params。')return app_token, table_id, view_iddef get_tenant_access_token(app_id, app_secret):url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"payload = json.dumps({"app_id": app_id,"app_secret": app_secret})headers = {'Content-Type': 'application/json'}response = requests.request("POST", url, headers=headers, data=payload)tenant_access_token = response.json()['tenant_access_token']print(f'成功获取tenant_access_token:{tenant_access_token}。关联函数:get_table_params。')return tenant_access_tokendef get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search?page_size={page_size}&page_token={page_token}&user_id_type=user_id"payload = json.dumps({"view_id": view_id})headers = {'Content-Type': 'application/json','Authorization': f'Bearer {tenant_access_token}'}response = requests.request("POST", url, headers=headers, data=payload)print(f'成功获取page_token为【{page_token}】的数据。关联函数:get_bitable_datas。')return response.json()def get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):has_more = Truefeishu_datas = []while has_more:response = get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token, page_size)if response['code'] == 0:page_token = response['data'].get('page_token')has_more = response['data'].get('has_more')# print(response['data'].get('items'))# print('\n--------------------------------------------------------------------\n')feishu_datas.extend(response['data'].get('items'))else:raise Exception(response['msg'])print(f'成功获取飞书多维表所有数据,返回 feishu_datas。关联函数:get_all_bitable_datas。')return feishu_datasdef get_bitable_fields(tenant_access_token, app_token, table_id, page_size=500):url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/fields?page_size={page_size}"payload = ''headers = {'Authorization': f'Bearer {tenant_access_token}'}response = requests.request("GET", url, headers=headers, data=payload)field_infos = response.json().get('data').get('items')print('成功获取飞书字段信息,关联函数:get_bitable_fields。')return field_infosdef merge_list(ls_from, ls_join, on=None, left_on=None, right_on=None):"""将两个[{},{}]结构的数据合并"""df_from = pd.DataFrame(ls_from)df_join = pd.DataFrame(ls_join)if on is not None:df_merge = df_from.merge(df_join, how='left', on=on)else:df_merge = df_from.merge(df_join, how='left', left_on=left_on, right_on=right_on) # , suffixes=('', '_y')print(f'成功合并列表或DataFrame。关联方法:merge_list。')return df_mergedef extract_key_fields(feishu_datas, store_fields_info_df):"""处理飞书数据类型编号的数据"""print('开始处理飞书多维表关键字段数据...')# 需要record_id 和 订单号,用于和数据库数据匹配df_feishu = pd.DataFrame(feishu_datas)df_return = pd.DataFrame()#根据列的数据类型,分别处理对应的数据。注意:仅返回以下列举的数据类型,如果fields_map的内容包含按钮、流程等数据类型的飞书列,忽略。for index, row in store_fields_info_df.iterrows():if row['type'] == 1:       #文本df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],[{}])[0].get('text'))elif row['type'] in (2, 3, 7, 13, 1005):  #数字、单选、复选框、手机号、自动编号df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name']))elif row['type'] in (5, 1001, 1002):         #日期(包含创建和更新),需要加 8 小时,即 8*60*60*1000=28800 秒df_return[row['tb_field_name']] = pd.to_datetime(df_feishu['fields'].apply(lambda x:28800 + int(x.get(row['field_name'],1000)/1000)), unit='s')elif row['type'] in(11, 23, 1003, 1004):       #人员、群组、创建人、修改人,遍历取namedf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x: ','.join([i.get('name') for i in x.get(row['field_name'],[{"name":""}])]))  # 需要遍历elif row['type'] == 15:    #链接df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link'))elif row['type'] == 17:    #附件,遍历取urldf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps([i.get('url') for i in x.get(row['field_name'],[{}])])) #需要遍历elif row['type'] in(18, 21):    #单向关联、双向关联,取link_record_idsdf_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'],{}).get('link_record_ids')))elif row['type'] in(4, 19, 20):    #多选、查找引用和公式df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'])))elif row['type'] == 22:    #地理位置df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('location'))#加上record_iddf_return['record_id'] = df_feishu.record_id#加上表更新字段df_return['last_modified_time'] = datetime.datetime.now()print(f'成功提取入库字段的数据。关联方法:extract_key_fields。')return df_returndef generate_create_ddl(db_table_name, store_fields_info_df):"""生成DDL还需要另外去建表,直接一条龙服务,使用Pyodps建表,并写入数据。下面代码二"""# 代码一:生成DDL语句cre_ddl = f"CREATE TABLE if not exists {db_table_name} (\n"for index, row in store_fields_info_df.iterrows():cre_ddl += f"    {row['tb_field_name']} {row['mysql_type']} comment '{row['feishu_field_name']}',\n"default_fields = "record_id varchar(20) comment '行record_id',\nlast_modified_time datetime comment '数据更新时间' \n);"# cre_ddl = cre_ddl[:-2] + "\n);"cre_ddl = cre_ddl + default_fieldsprint(f'成功生成 mysql 建表 DDL。关联方法:generate_create_ddl。')return cre_ddldef cre_mysql_table(create_table_sql, connection_conf_info):"""注意:该函数支持执行任意SQL,而不仅仅是建表 SQL。"""# from sqlalchemy import create_engine, text# 创建 SQLAlchemy engine 对象,这里以 MySQL 为例# engine = create_engine('mysql://username:password@host:port/dbname')# engine = create_engine('mysql://root:password@127.0.0.1:3306/test')engine = create_engine(connection_conf_info)# 定义一个建表的 SQL 语句# create_table_sql = ''''''# 使用 execute() 方法执行 SQL 语句with engine.connect() as connection:connection.execute(text(create_table_sql))print(f'成功执行 mysql 建表语句。关联方法:cre_mysql_table。')def insert_mysql_table(feishu_df, table_name, connection_conf_info):# from sqlalchemy import create_engine# 创建 SQLAlchemy engine 对象,这里以 MySQL 为例# engine = create_engine('mysql://username:password@host:port/dbname')
#     engine = create_engine('mysql://root:password@127.0.0.1:3306/test')engine = create_engine(connection_conf_info)# 将 DataFrame 直接写入 MySQL 数据库feishu_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)print(f'成功将飞书数据写入 mysql 数据表。关联方法:insert_mysql_table。')def main(connection_conf_info, mysql_table_name, bitable_url, fields_map):# 基本配置app_token, table_id, view_id = get_table_params(bitable_url)app_id = 'your_app_id'app_secret = 'your_app_secret'tenant_access_token = get_tenant_access_token(app_id, app_secret)page_size = 5# 获取飞书多维表所有数据feishu_datas = get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_size=page_size)#获取飞书字段信息feishu_fields = get_bitable_fields(tenant_access_token, app_token, table_id)# 以 fields_map 为准关联数据store_fields_info_df = merge_list(fields_map, feishu_fields, left_on='feishu_field_name', right_on='field_name')# 处理入库字段数据feishu_df = extract_key_fields(feishu_datas, store_fields_info_df)# 关联入库数据类型data_type_map = [{"feishu_type": 1   ,"mysql_type": "varchar(256)"   },{"feishu_type": 2   ,"mysql_type": "FLOAT"          },{"feishu_type": 3   ,"mysql_type": "varchar(256)"   },{"feishu_type": 4   ,"mysql_type": "json"           },{"feishu_type": 5   ,"mysql_type": "datetime"       },{"feishu_type": 7   ,"mysql_type": "BOOL"           },{"feishu_type": 11  ,"mysql_type": "varchar(256)"   },{"feishu_type": 13  ,"mysql_type": "varchar(256)"   },{"feishu_type": 15  ,"mysql_type": "varchar(256)"   },{"feishu_type": 17  ,"mysql_type": "varchar(256)"   },{"feishu_type": 18  ,"mysql_type": "json"           },{"feishu_type": 19  ,"mysql_type": "varchar(256)"   },{"feishu_type": 20  ,"mysql_type": "varchar(256)"   },{"feishu_type": 21  ,"mysql_type": "json"           },{"feishu_type": 22  ,"mysql_type": "varchar(256)"   },{"feishu_type": 23  ,"mysql_type": "varchar(256)"   },{"feishu_type": 1001,"mysql_type": "datetime"       },{"feishu_type": 1002,"mysql_type": "datetime"       },{"feishu_type": 1003,"mysql_type": "varchar(256)"   },{"feishu_type": 1004,"mysql_type": "varchar(256)"   },{"feishu_type": 1005,"mysql_type": "varchar(256)"   }]store_fields_info_df = merge_list(store_fields_info_df, data_type_map, left_on='type', right_on='feishu_type')# 生成 MySQL 建表 DDLcreate_table_sql = generate_create_ddl(mysql_table_name, store_fields_info_df)# 建 mysql 数据表cre_mysql_table(create_table_sql, connection_conf_info)# MySQL 表插入数据insert_mysql_table(feishu_df, mysql_table_name, connection_conf_info)db_name = connection_conf_info.split('/')[-1]print(f'成功将飞书多维表({bitable_url})的数据入库到 mysql 数据表:{db_name}.{mysql_table_name}。')if __name__ == '__main__':connection_conf_info = 'mysql://root:password@127.0.0.1:3306/test'mysql_table_name = 'feishu_data_type_test'bitable_url = "https://xxxx.feishu.cn/base/PtRdbPjCFa5Og5sry0lcD1yPnKg?table=tblcc5oozF4EOBOE&view=vewVaEFMO6"fields_map = [{'tb_field_name': 'field_text','feishu_field_name': '文本'},{'tb_field_name': 'field_email','feishu_field_name': 'email'},{'tb_field_name': 'field_select','feishu_field_name': '单选'},{'tb_field_name': 'field_mobile','feishu_field_name': '电话号码'},{'tb_field_name': 'field_no','feishu_field_name': '自动编号'},{'tb_field_name': 'field_member1','feishu_field_name': '人员1'},{'tb_field_name': 'field_group1','feishu_field_name': '群组1'},{'tb_field_name': 'field_creator','feishu_field_name': '创建人'},{'tb_field_name': 'field_modifier','feishu_field_name': '修改人'},{'tb_field_name': 'field_member2','feishu_field_name': '人员2'},{'tb_field_name': 'field_group2','feishu_field_name': '群组2'},{'tb_field_name': 'field_url','feishu_field_name': '超链接'},{'tb_field_name': 'field_location','feishu_field_name': '地理位置'},{'tb_field_name': 'field_findnum','feishu_field_name': '查找引用数值'},{'tb_field_name': 'field_numformula','feishu_field_name': '数字公式'},{'tb_field_name': 'field_number','feishu_field_name': '数字'},{'tb_field_name': 'field_progress','feishu_field_name': '进度'},{'tb_field_name': 'field_money','feishu_field_name': '货币'},{'tb_field_name': 'field_Rating','feishu_field_name': '评分'},{'tb_field_name': 'field_bool','feishu_field_name': '复选框'},{'tb_field_name': 'field_date','feishu_field_name': '日期'},{'tb_field_name': 'field_createdtime','feishu_field_name': '创建时间'},{'tb_field_name': 'field_updatedtime','feishu_field_name': '更新时间'},{'tb_field_name': 'field_mulselect','feishu_field_name': '多选'},{'tb_field_name': 'field_singleunion','feishu_field_name': '单向关联'},{'tb_field_name': 'field_doubleunion','feishu_field_name': '双向关联'},{'tb_field_name': 'field_file','feishu_field_name': '附件'}]main(connection_conf_info, mysql_table_name, bitable_url, fields_map)

最终入库结果如下,符合预期。
image.png

image.png

image.png


一点补充:目前该代码是追加的方式写入,会保留历史已经插入的数据,如果不要历史数据,保留最后一次插入的数据即可。可以通过者清空表数据再插入数据,或者在调用建表方法之前删表,再重建,再插入数据这两种方式。

  • 清空表可以使用以下 SQL,通过调用cre_mysql_table()函数执行 SQL;
truncate_table_sql = 'truncate table {mysql_table_name};'
cre_mysql_table(truncate_table_sql, connection_conf_info)
  • 删除表可以使用以下 SQL,通过调用cre_mysql_table()函数执行 SQL。
drop_table_sql = 'drop table if exists {mysql_table_name};'
cre_mysql_table(drop_table_sql, connection_conf_info)
  • 当然,删表重建也可以直接将“if_exists”参数(是在insert_mysql_table()函数中调用的to_sql()方法中的一个参数)改为“replace”,它会在插入数据前,先删表重建,但是它无法保证新建的 MySQL 数据表各个字段的数据类型。

四、小结

本文完成了飞书多维表数据写入 MySQL 数据库的通用版本代码的开发,经过对代码进行重构,把三组核心关系:飞书列名和数据类型、飞书列名和入库表单字段名、飞书数据类型和入库字段的数据类型解耦出来,使得代码通用性更强。

  • 第一组关系借助飞书获取字段信息的接口解决;
  • 第二组关系又用户使用时进行指定;
  • 第三组关系比较固定,保持不变即可。

经过本文改造,目前只需要提供必备的应用信息(app_id和app_secret)、数据库连接配置、多维表链接和需要入库的列名及英文名即可完成对多维表数据进行入库。
ps:有个大前提!不要忘了给应用开通多维表的读取权限!

完整代码适用范围:

  • 飞书多维表入库 MySQL 数据库,使用者需要拥有飞书应用权限、MySQL 数据库的 SELECT、INSERT、DELETE、CREATE、DROP 权限
  • 写入规则是追加,如果只需保留最新版本数据,可以在插入数据前对数据进行清空,或者删表重建,参考最后的补充内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/319320.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu编译安装MariaDB并进行初始化配置

Ubuntu编译安装MariaDB并进行初始化配置 1. 编译安装MariaDB2. 配置MariaDB3. Docker安装MariaDB 1. 编译安装MariaDB MariaDB官方安装文档:https://mariadb.com/kb/en/Build_Environment_Setup_for_Linux/    下载MariaDB源码:https://mariadb.org/ma…

022、Python+fastapi,第一个Python项目走向第22步:ubuntu 24.04 docker 安装mysql8集群、redis集群(三)

这次来安装mysql8了,以前安装不是docker安装,这个我也是第一次,人人都有第一次嚒 前言 前面的redis安装还是花了点时间的,主要是网上教程,各有各的好,大家千万别取其长处,个人觉得这个环境影响…

【C语言】分支和循环(上)

【C语言】分支和循环(上) 1、if语句1.2 else1.3分支中包含多条语句1.4嵌套if1.5悬空else问题 2、关系操作符3、条件操作符4、逻辑操作符:与、或、非(取反)(&&,||,&#xff0…

深入理解网络原理3----TCP核心特性介绍(上)【面试高频考点】

文章目录 前言TCP协议段格式一、确认应答【保证可靠性传输的机制】二、超时重传【保证可靠性传输的机制】三、连接管理机制【保证可靠性传输的机制】3.1建立连接(TCP三次握手)---经典面试题3.2断开连接(四次挥手)3.3TCP状态转换 四…

「 网络安全常用术语解读 」通用漏洞报告框架CVRF详解

1. 背景 ICASI在推进多供应商协调漏洞披露方面处于领先地位,引入了通用漏洞报告框架(Common Vulnerability Reporting Format,CVRF)标准,制定了统一安全事件响应计划(USIRP)的原则,…

JSP与JavaBean

目录 一、JavaBean是什么 二、创建JavaBean 三、在JSP中使用JavaBean 1、按照Java语法直接使用 2、<jsp:useBean>动作 Bean的加载原理 scope属性的不同取值 3、<jsp:setProperty>动作 设置为一个表达式的值或字符序列 通过表单的参数的值来设置Bean的相应…

Find My无人机|苹果Find My技术与无人机结合,智能防丢,全球定位

无人机是利用无线电遥控设备和自备的程序控制装置操纵的不载人飞机&#xff0c;或者由车载计算机完全地或间歇地自主地操作。无人机按应用领域&#xff0c;可分为军用与民用。军用方面&#xff0c;无人机分为侦察机和靶机。民用方面&#xff0c;无人机行业应用&#xff0c;是无…

43 单例模式

目录 1.什么是单例模式 2.什么是设计模式 3.特点 4.饿汉和懒汉 5.峨汉实现单例 6.懒汉实现单例 7.懒汉实现单例&#xff08;线程安全&#xff09; 8.STL容器是否线程安全 9.智能指针是否线程安全 10.其他常见的锁 11.读者写者问题 1. 什么是单例模式 单例模式是一种经典的&a…

多多搜索推广计划怎么设置

拼多多推广可以使用3an推客。3an推客&#xff08;CPS模式&#xff09;给商家提供的营销工具&#xff0c;由商家自主设置佣金比例&#xff0c;激励推广者去帮助商家推广商品链接&#xff0c;按最终有效交易金额支付佣金&#xff0c;不成交不扣费。是商家破零、积累基础销量的重要…

自动化机器学习——网格搜索法:寻找最佳超参数组合

自动化机器学习——网格搜索法&#xff1a;寻找最佳超参数组合 在机器学习中&#xff0c;选择合适的超参数是模型调优的关键步骤之一。然而&#xff0c;由于超参数的组合空间通常非常庞大&#xff0c;手动调整超参数往往是一项耗时且困难的任务。为了解决这个问题&#xff0c;…

连接HiveMQ代理器实现MQTT协议传输

先下载MQTTX: MQTTX: Your All-in-one MQTT Client Toolbox 使用线上免费的MQTTX BROKER:The Free Global Public MQTT Broker | Try Now | EMQ 打开MQTTX&#xff0c;创建连接&#xff0c;点击NEW SUBSCRIPTION,创建一个主题&#xff0c;这里使用test/topic,在下面Json中填写…

使用 ORPO 微调 Llama 3

原文地址&#xff1a;https://towardsdatascience.com/fine-tune-llama-3-with-orpo-56cfab2f9ada 更便宜、更快的统一微调技术 2024 年 4 月 19 日 ORPO 是一种新的令人兴奋的微调技术&#xff0c;它将传统的监督微调和偏好校准阶段合并为一个过程。这减少了训练所需的计算…

Java零基础入门到精通_Day 8

1.API 应用程序接口 Java API:指的就是JDK 中提供的各种功能的Java类这些类将底层的实现封装了起来&#xff0c;我们不需要关心这些类是如何实现的&#xff0c;只需要学习这些类如何使用即可&#xff0c;我们可以通过帮助文档来学习这些API如何使用。 2. String String 类…

【副本向】Lua副本逻辑

副本生命周期 OnCopySceneTick() 子线程每次心跳调用 --副本心跳 function x3323_OnCopySceneTick(elapse)if x3323_g_IsPlayerEnter 0 thenreturn; -- 如果没人进入&#xff0c;则函数直接返回endif x3323_g_GameOver 1 thenif x3323_g_EndTick > 0 thenx3323_CountDown…

循环神经网络完整实现(Pytorch 13)

一 循环神经网络的从零开始实现 从头开始基于循环神经网络实现字符级语言模型。 %matplotlib inline import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2lbatch_size, num_steps 32, 35 train_iter, vocab …

分布式websocket IM即时通讯聊天开源项目如何启动

前言 自己之前分享了分布式websocket的视频有同学去fork项目了&#xff0c;自己启动一下更方便理解项目嘛。然后把项目启动需要的东西全部梳理出来。支持群聊单聊,表情包以及发送图片。 支持消息可靠&#xff0c;消息防重&#xff0c;消息有序。同时基础架构有分布式权限&…

OneFlow深度学习框原理、用法、案例和注意事项

本文将基于OneFlow深度学习框架&#xff0c;详细介绍其原理、用法、案例和注意事项。OneFlow是由中科院计算所自动化研究所推出的深度学习框架&#xff0c;专注于高效、易用和扩展性强。它提供了一种类似于深度学习库的接口&#xff0c;可以用于构建神经网络模型&#xff0c;并…

Android4.4真机移植过程笔记(三)

如果文章字体看得不是很清楚&#xff0c;大家可以下载pdf文档查看&#xff0c;文档已上传&#xff5e;oo&#xff5e; 7、安装加密APK 需要修改文件如下&#xff1a; 相对Android4.2改动还是蛮大的&#xff0c;有些文件连路径都变了: //Android4.2 1、frameworks/native/libs…

STL速查

容器 (Containers) 图解容器 支持随机访问 stringarrayvectordeque支持支持支持支持 string 类 构造函数 string(); ------创建一个空的字符串 例如: string str;string(const char* s); ------使用字符串s初始化string(const string& str); ------拷贝构造 赋值操作…

C++学习--点滴记录011

11函数提高 11.1 函数默认参数 在C中&#xff0c;函数的形参列表中的形参可以有默认值 语法&#xff1a; 返回值类型 函数名 &#xff08;参数 默认值&#xff09;{} 示例&#xff1a; #include <iostream> using namespace std;int func(int a, int b 10, int c …