diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 9cc2d34..0000000 --- a/.gitignore +++ /dev/null @@ -1,66 +0,0 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] - -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*,cover - -# Translations -*.mo -*.pot - -# Django stuff: -*.log - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -*.yaml - -# virtualenv -venv/ -ENV/ - -# Pycharm settings -*.idea/ \ No newline at end of file diff --git a/LICENSE b/LICENSE deleted file mode 100644 index dbd643c..0000000 --- a/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -The MIT License (MIT) - -Copyright (c) by Windfarer - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index c625213..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,2 +0,0 @@ -include README.md LICENSE -recursive-include src *.yaml \ No newline at end of file diff --git a/README.md b/README.md index 2abf719..0cc246d 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,70 @@ -# py-mysql-elasticsearch-sync -Simple and fast MySQL to Elasticsearch sync tool, written in Python. +## Description -[中文文档](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/README_CN.md) + -## Introduction -This tool helps you to initialize MySQL dump table to Elasticsearch by parsing mysqldump, then incremental sync MySQL table to Elasticsearch by processing MySQL Binlog. -Also, during the binlog syncing, this tool will save the binlog sync position, so that it is easy to recover after this tool being shutdown for any reason. +This project replicates Mysql to Elasticsearch, reading binlog events and converting them to python objects to jsons. -## Installation -By following these steps. +Note that the project does not supoort HTTPS when migrating. -##### 1. ibxml2 and libxslt -This tool depends on python lxml package, so that you should install the lxml's dependecies correctly, the libxml2 and libxslt are required. +The project is a modified version of [py-mysql-elasticsearch-sync](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync), using library named [python-mysql-replication](https://github.com/julien-duponchelle/python-mysql-replication). -For example, in CentOS: -``` -sudo yum install libxml2 libxml2-devel libxslt libxslt-devel -``` +
+ +## Give it a try! -Or in Debian/Ubuntu: +make containers by: ``` -sudo apt-get install libxml2-dev libxslt-dev python-dev +docker-compose up ``` -See [lxml Installation](http://lxml.de/installation.html) for more infomation. -##### 2. mysqldump -And then, mysqldump is required in the machine where this tool will be run on it.(and the mysql server must enable binlog) +
-##### 3. this tool -Then install this tool +stop the elasticsearch container and modify /usr/share/elasticsearch/config/elasticsearch.yml as: ``` -pip install py-mysql-elasticsearch-sync +(edit true to false) xpack.security.enabled: false + +(at the end of the file add this) action.destructive_requires_name: false ``` -## Configuration -There is a [sample config](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml) file in repo, you can start by editing it. -## Running -Simply run command +
+ +restart the elasticsearch container, and in the pymyelarepl container, ``` -es-sync path/to/your/config.yaml +(for test) cd pymyelarepl && python test/test_basic.py + + +(after executing the following sql in the mysql container as root without password, for example) cd pymyelarepl && python example/run.py + +CREATE DATABASE test; +use test; +CREATE TABLE test4 (id int NOT NULL AUTO_INCREMENT, data VARCHAR(255), data2 VARCHAR(255), PRIMARY KEY(id)); +INSERT INTO test4 (data, data2) VALUES ("Hello", "World"); +UPDATE test4 SET data="World", data2="Hello" WHERE id=1; +DELETE FROM test4 WHERE id=1; ``` -and the tool will dump your data as stream to sync, when dump is over, it will start to sync binlog. -The latest synced binlog file and position are recorded in your info file which is configured in your config file. You can restart dump step by remove it, or you can change sync position by edit it. -Or if you but want to load it from your own dumpfile. You should dump your table first as xml format(by adding ```-X```option to your mysqldump command) +
-then +the results of the two cases are similar to the below, which means data is replicated successfully: ``` -es-sync path/to/your/config.yaml --fromfile -``` -to start sync, when xml sync is over, it will also start binlog sync. +{'errors': False, 'took': 12, 'items': [{'create': {'_index': 'basic_replication', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 6, '_primary_term': 1, 'status': 201}}, {'create': {'_index': 'basic_replication', '_id': '2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 7, '_primary_term': 1, 'status': 201}}]} +{'errors': False, 'took': 11, 'items': [{'update': {'_index': 'basic_replication', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 8, '_primary_term': 1, 'status': 200}}, {'update': {'_index': 'basic_replication', '_id': '2', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9, '_primary_term': 1, 'status': 200}}]} +{'errors': False, 'took': 2, 'items': [{'delete': {'_index': 'basic_replication', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 10, '_primary_term': 1, 'status': 200}}, {'delete': {'_index': 'basic_replication', '_id': '2', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 11, '_primary_term': 1, 'status': 200}}]} -## Deployment -We provide an [upstart script]((https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/upstart.conf)) to help you deploy this tool, you can edit it for your own condition, besides, you can deploy it in your own way. +(blocked for the example) +``` -## MultiTable Supporting -Now Multi-table is supported through setting tables in config file, the first table is master as default and the others are slave. -Master table and slave tables must use the same primary key, which is defined via _id. +
-Table has higher priority than tables. +## License -## TODO -- [ ] MultiIndex Supporting +MIT \ No newline at end of file diff --git a/README_CN.md b/README_CN.md deleted file mode 100644 index e2d05e4..0000000 --- a/README_CN.md +++ /dev/null @@ -1,73 +0,0 @@ -# py-mysql-elasticsearch-sync -一个从MySQL向Elasticsearch同步数据的工具,使用Python实现。 - -## 简介 -在第一次初始化数据时,本工具解析mysqldump导出的数据,并导入ES中,在后续增量更新中,解析binlog的数据,对ES中的数据进行同步。在binlog同步阶段,支持断点恢复,因此无需担心意外中断的问题。 - -## 安装 - -##### 1. ibxml2 和 libxslt -本工具基于lxml库,因此需要安装它的依赖的libxml2和libxslt - -在CentOS中: - -``` -sudo yum install libxml2 libxml2-devel libxslt libxslt-devel -``` - -在Debian/Ubuntu中: - -``` -sudo apt-get install libxml2-dev libxslt-dev python-dev -``` - -查看[lxml Installation](http://lxml.de/installation.html)来获取更多相关信息 - -##### 2. mysqldump -在运行本工具的机器上需要有mysqldump,并且mysql服务器需要开启binlog功能。 - - -##### 3. 本工具 -安装本工具 - -``` -pip install py-mysql-elasticsearch-sync -``` - -## 配置 -你可以通过修改[配置文件示例](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml)来编写自己的配置文件 - -## 运行 -运行命令 - -``` -es-sync path/to/your/config.yaml -``` -工具将开始执行mysqldump并解析流进行同步,当dump结束后,将启动binlog同步 - -最近一次binlog同步位置记录在一个文件中,这个文件的路径在config文件中配置过。 - -你可以删除记录文件来从头进行binlog同步,或者修改文件里的内容,来从特定位置开始同步。 - - -你也可以把自己从mysql导出的xml文件同步进ES中(在mysqldump的命令中加上参数```-X```即可导出xml) - -然后执行 - -``` -es-sync path/to/your/config.yaml --fromfile -``` -启动从xml导入,当从xml导入完毕后,它会开始同步binlog - -## 服务管理 -我们写了一个[upstart脚本](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/upstart.conf)来管理本工具的运行,你也可以用你自己的方式进行部署运行 - -## 多表支持 -你可以在config文件中配置tables以支持多表,默认tables中第一张表为主表,其余表为从表。 - -主表和从表主键必须相同,均为_id字段。 - -当同时设置table和tables时,table优先级较高。 - -## TODO -- [ ] 多索引支持 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f359d6d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,48 @@ +version: '3.8' + +services: + mysql-8.1.0: + image: mysql:8.1.0 + ports: + - "3307:3306" + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + command: > + mysqld + --log-bin=mysql-bin.log + --server-id 1 + --binlog-format=row + --gtid_mode=on + --enforce-gtid-consistency=on + networks: + network: + ipv4_address: 172.0.0.2 + + elasticsearch-8.10.2: + image: elasticsearch:8.10.2 + ports: + - "9201:9200" + networks: + network: + ipv4_address: 172.0.0.3 + + pymyelarepl: + build: + context: . + dockerfile: pymyelarepl.Dockerfile + args: + BASE_IMAGE: python:3.12.0rc3-bookworm + command: sleep infinity + ports: + - "3001:3000" + networks: + network: + ipv4_address: 172.0.0.4 + +networks: + network: + driver: bridge + ipam: + config: + - subnet: 172.0.0.0/24 + gateway: 172.0.0.1 \ No newline at end of file diff --git a/es_sync/__init__.py b/es_sync/__init__.py deleted file mode 100644 index eb92e77..0000000 --- a/es_sync/__init__.py +++ /dev/null @@ -1,520 +0,0 @@ -from __future__ import print_function, unicode_literals -from future.builtins import str, range -import sys - -PY2 = sys.version_info[0] == 2 - -if PY2: - import os - DEVNULL = open(os.devnull, 'wb') -else: - from subprocess import DEVNULL - - -def encode_in_py2(s): - if PY2: - return s.encode('utf-8') - return s - -import os.path -import yaml -import signal -import requests -import subprocess -import json -import logging -import shlex -import datetime -import decimal -from lxml.etree import iterparse -from functools import reduce -from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent -from pymysqlreplication.event import RotateEvent, XidEvent - -__version__ = '0.4.2' - - -# The magic spell for removing invalid characters in xml stream. -REMOVE_INVALID_PIPE = r'tr -d "\00\01\02\03\04\05\06\07\10\13\14\16\17\20\21\22\23\24\25\26\27\30\31\32\33\34\35\36\37"' - -DEFAULT_BULKSIZE = 100 -DEFAULT_BINLOG_BULKSIZE = 1 - - -class ElasticSync(object): - table_structure = {} - log_file = None - log_pos = None - - @property - def is_binlog_sync(self): - rv = bool(self.log_file and self.log_pos) - return rv - - def __init__(self): - try: - self.config = yaml.load(open(sys.argv[1])) - except IndexError: - print('Error: not specify config file') - exit(1) - - mysql = self.config.get('mysql') - if mysql.get('table'): - self.tables = [mysql.get('table')] - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \ - '--default-character-set=utf8 -X --opt --quick'.format(**mysql) - elif mysql.get('tables'): - self.tables = mysql.get('tables') - mysql.update({ - 'tables': ' '.join(mysql.get('tables')) - }) - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} --database {db} --tables {tables} ' \ - '--default-character-set=utf8 -X --opt --quick'.format(**mysql) - else: - print('Error: must specify either table or tables') - exit(1) - self.master = self.tables[0] # use the first table as master - self.current_table = None - - self.binlog_conf = dict( - [(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password', 'db']] - ) - - self.endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format( - host=self.config['elastic']['host'], - port=self.config['elastic']['port'], - index=self.config['elastic']['index'], - type=self.config['elastic']['type'] - ) # todo: supporting multi-index - - self.mapping = self.config.get('mapping') or {} - if self.mapping.get('_id'): - self.id_key = self.mapping.pop('_id') - else: - self.id_key = None - - self.ignoring = self.config.get('ignoring') or [] - - record_path = self.config['binlog_sync']['record_file'] - if os.path.isfile(record_path): - with open(record_path, 'r') as f: - record = yaml.load(f) - self.log_file = record.get('log_file') - self.log_pos = record.get('log_pos') - - self.bulk_size = self.config.get('elastic').get('bulk_size') or DEFAULT_BULKSIZE - self.binlog_bulk_size = self.config.get('elastic').get('binlog_bulk_size') or DEFAULT_BINLOG_BULKSIZE - - self._init_logging() - self._force_commit = False - - def _init_logging(self): - logging.basicConfig(filename=self.config['logging']['file'], - level=logging.INFO, - format='[%(levelname)s] - %(filename)s[line:%(lineno)d] - %(asctime)s %(message)s') - self.logger = logging.getLogger(__name__) - logging.getLogger("requests").setLevel(logging.WARNING) # disable requests info logging - - def cleanup(*args): - self.logger.info('Received stop signal') - self.logger.info('Shutdown') - sys.exit(0) - - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGTERM, cleanup) - - def _post_to_es(self, data): - """ - send post requests to es restful api - """ - resp = requests.post(self.endpoint, data=data) - if resp.json().get('errors'): # a boolean to figure error occurs - for item in resp.json()['items']: - if list(item.values())[0].get('error'): - logging.error(item) - else: - self._save_binlog_record() - - def _bulker(self, bulk_size): - """ - Example: - u = bulker() - u.send(None) #for generator initialize - u.send(json_str) # input json item - u.send(another_json_str) # input json item - ... - u.send(None) force finish bulk and post - """ - while True: - data = "" - for i in range(bulk_size): - item = yield - if item: - data = data + item + "\n" - else: - break - if self._force_commit: - break - # print(data) - print('-'*10) - if data: - self._post_to_es(data) - - self._force_commit = False - - def _updater(self, data): - """ - encapsulation of bulker - """ - if self.is_binlog_sync: - u = self._bulker(bulk_size=self.binlog_bulk_size) - else: - u = self._bulker(bulk_size=self.bulk_size) - - u.send(None) # push the generator to first yield - for item in data: - u.send(item) - u.send(None) # tell the generator it's the end - - def _json_serializer(self, obj): - """ - format the object which json not supported - """ - if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date): - return obj.isoformat() - elif isinstance(obj, decimal.Decimal): - return str(obj) - raise TypeError('Type not serializable for obj {obj}'.format(obj=obj)) - - def _processor(self, data): - """ - The action must be one of the following: - create - Create a document only if the document does not already exist. - index - Create a new document or replace an existing document. - update - Do a partial update on a document. - delete - Delete a document. - """ - for item in data: - if self.id_key: - action_content = {'_id': item['doc'][self.id_key]} - else: - action_content = {} - for field in self.ignoring: - try: - item['doc'].pop(field) - except KeyError: - pass - meta = json.dumps({item['action']: action_content}) - if item['action'] == 'index': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'update': - body = json.dumps({'doc': item['doc']}, default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'delete': - rv = meta + '\n' - elif item['action'] == 'create': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - else: - logging.error('unknown action type in doc') - raise TypeError('unknown action type in doc') - yield rv - - def _mapper(self, data): - """ - mapping old key to new key - """ - for item in data: - if self.mapping: - for k, v in self.mapping.items(): - try: - item['doc'][k] = item['doc'][v] - del item['doc'][v] - except KeyError: - continue - # print(doc) - yield item - - def _formatter(self, data): - """ - format every field from xml, according to parsed table structure - """ - for item in data: - for field, serializer in self.table_structure.items(): - if field in item['doc'] and item['doc'][field]: - try: - item['doc'][field] = serializer(item['doc'][field]) - except ValueError as e: - self.logger.error( - "Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format( - msg=str(e), - item=str(item))) - item['doc'][field] = None - except TypeError as e: - item['doc'][field] = None - # print(item) - yield item - - def _binlog_loader(self): - """ - read row from binlog - """ - if self.is_binlog_sync: - resume_stream = True - logging.info("Resume from binlog_file: {file} binlog_pos: {pos}".format(file=self.log_file, - pos=self.log_pos)) - else: - resume_stream = False - - stream = BinLogStreamReader(connection_settings=self.binlog_conf, - server_id=self.config['mysql']['server_id'], - only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, RotateEvent, XidEvent], - only_tables=self.tables, - resume_stream=resume_stream, - blocking=True, - log_file=self.log_file, - log_pos=self.log_pos) - for binlogevent in stream: - self.log_file = stream.log_file - self.log_pos = stream.log_pos - - # RotateEvent to update binlog record when no related table changed - if isinstance(binlogevent, RotateEvent): - self._save_binlog_record() - continue - - if isinstance(binlogevent, XidEvent): # event_type == 16 - self._force_commit = True - continue - - for row in binlogevent.rows: - if isinstance(binlogevent, DeleteRowsEvent): - if binlogevent.table == self.master: - rv = { - 'action': 'delete', - 'doc': row['values'] - } - else: - rv = { - 'action': 'update', - 'doc': {k: row['values'][k] if self.id_key and self.id_key == k else None for k in row['values']} - } - elif isinstance(binlogevent, UpdateRowsEvent): - rv = { - 'action': 'update', - 'doc': row['after_values'] - } - elif isinstance(binlogevent, WriteRowsEvent): - if binlogevent.table == self.master: - rv = { - 'action': 'create', - 'doc': row['values'] - } - else: - rv = { - 'action': 'update', - 'doc': row['values'] - } - else: - logging.error('unknown action type in binlog') - raise TypeError('unknown action type in binlog') - yield rv - # print(rv) - stream.close() - raise IOError('mysql connection closed') - - def _parse_table_structure(self, data): - """ - parse the table structure - """ - for item in data.iter(): - if item.tag == 'field': - field = item.attrib.get('Field') - type = item.attrib.get('Type') - if 'int' in type: - serializer = int - elif 'float' in type: - serializer = float - elif 'datetime' in type: - if '(' in type: - serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f') - else: - serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') - elif 'char' in type: - serializer = str - elif 'text' in type: - serializer = str - else: - serializer = str - self.table_structure[field] = serializer - - def _parse_and_remove(self, f, path): - """ - snippet from python cookbook, for parsing large xml file - """ - path_parts = path.split('/') - doc = iterparse(f, ('start', 'end'), recover=False, encoding='utf-8', huge_tree=True) - # Skip the root element - next(doc) - tag_stack = [] - elem_stack = [] - for event, elem in doc: - if event == 'start': - if elem.tag == 'table_data': - self.current_table = elem.attrib['name'] - tag_stack.append(elem.tag) - elem_stack.append(elem) - elif event == 'end': - if tag_stack == ['database', 'table_data']: - self.current_table = None - if tag_stack == path_parts: - yield elem - elem_stack[-2].remove(elem) - if tag_stack == ['database', 'table_structure']: - # dirty hack for getting the tables structure - self._parse_table_structure(elem) - elem_stack[-2].remove(elem) - try: - tag_stack.pop() - elem_stack.pop() - except IndexError: - pass - - def _xml_parser(self, f_obj): - """ - parse mysqldump XML streaming, convert every item to dict object. - 'database/table_data/row' - """ - for row in self._parse_and_remove(f_obj, 'database/table_data/row'): - doc = {} - for field in row.iter(tag='field'): - k = field.attrib.get('name') - v = field.text - doc[k] = v - if not self.current_table or self.current_table == self.master: - yield {'action': 'create', 'doc': doc} - else: - yield {'action': 'update', 'doc': doc} - - def _save_binlog_record(self): - if self.is_binlog_sync: - with open(self.config['binlog_sync']['record_file'], 'w') as f: - logging.info("Sync binlog_file: {file} binlog_pos: {pos}".format( - file=self.log_file, - pos=self.log_pos) - ) - yaml.safe_dump({"log_file": self.log_file, - "log_pos": self.log_pos}, - f, - default_flow_style=False) - - def _xml_dump_loader(self): - mysqldump = subprocess.Popen( - shlex.split(encode_in_py2(self.dump_cmd)), - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - remove_invalid_pipe = subprocess.Popen( - shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), - stdin=mysqldump.stdout, - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - return remove_invalid_pipe.stdout - - def _xml_file_loader(self, filename): - f = open(filename, 'rb') # bytes required - - remove_invalid_pipe = subprocess.Popen( - shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), - stdin=f, - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - return remove_invalid_pipe.stdout - - def _send_email(self, title, content): - """ - send notification email - """ - if not self.config.get('email'): - return - - import smtplib - from email.mime.text import MIMEText - - msg = MIMEText(content) - msg['Subject'] = title - msg['From'] = self.config['email']['from']['username'] - msg['To'] = ', '.join(self.config['email']['to']) - - # Send the message via our own SMTP server. - s = smtplib.SMTP() - s.connect(self.config['email']['from']['host']) - s.login(user=self.config['email']['from']['username'], - password=self.config['email']['from']['password']) - s.sendmail(msg['From'], msg['To'], msg=msg.as_string()) - s.quit() - - def _sync_from_stream(self): - logging.info("Start to dump from stream") - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_dump_loader()) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_file(self): - logging.info("Start to dump from xml file") - logging.info("Filename: {}".format(self.config['xml_file']['filename'])) - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_file_loader(self.config['xml_file']['filename'])) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_binlog(self): - logging.info("Start to sync binlog") - docs = reduce(lambda x, y: y(x), [self._mapper, - self._processor], - self._binlog_loader()) - self._updater(docs) - - def run(self): - """ - workflow: - 1. sync dump data - 2. sync binlog - """ - try: - if not self.is_binlog_sync: - if len(sys.argv) > 2 and sys.argv[2] == '--fromfile': - self._sync_from_file() - else: - self._sync_from_stream() - self._sync_from_binlog() - except Exception: - import traceback - logging.error(traceback.format_exc()) - self._send_email('es sync error', traceback.format_exc()) - raise - - -def start(): - instance = ElasticSync() - instance.run() - -if __name__ == '__main__': - start() diff --git a/es_sync/sample.yaml b/es_sync/sample.yaml deleted file mode 100644 index 5a6a64f..0000000 --- a/es_sync/sample.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# The mysql database which you want to sync -mysql: - host: 127.0.0.1 - port: 3306 - user: foo - password: bar - db: mydb - table: mytable - tables: # support multi-table here, you can set tables instead of table and the first one will be set to master as default - - table1 - server_id: 1 # this should be unique - -elastic: - host: 127.0.0.1 - port: 9200 - bulk_size: 200 # the update bulk size when mysqldump, default is 100 if not specified - binlog_bulk_size: 10 # the update bulk size when syncing binlog, default is 1 if not specified - index: article - type: article - -# path to your own xml file, if you want to initialize dump from xml file. run with argument --fromfile in command -xml_file: - filename: a.xml - -# If you want to map your column, put the column name as the value, and es field name as the key, -# Particularly , if you set _id as follows, it will use myid column as the index doc's id, or ES will generate an id as default -mapping: - _id: myid - es_field_name: mysql_column_name - -# You can set ignoring fields here, and these fields will not be post to ES. -ignoring: - - ignoring_field - -# The log file's path -logging: - file: mylog.log - -# The record file's path, which record the latest synced binlog file and position -binlog_sync: - record_file: binlog.info - -# If you want to email notification when error occurs, fill this -email: - from: # the sender's email, uses smtp protocol - host: smtp.example.com - username: sender@example.com - password: senderpassword - to: # a list of notification recipients - - first_recipient@example.com - - second_recipient@example.com \ No newline at end of file diff --git a/example/__init__.py b/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/config.yaml b/example/config.yaml new file mode 100644 index 0000000..2ba9728 --- /dev/null +++ b/example/config.yaml @@ -0,0 +1,15 @@ +# all fields must be filled. + +mysql: + host: '172.0.0.2' # or '127.0.0.1' + port: 3306 # or 3307 + user: 'root' + password: '' + server_id: 2 # this should be unique. this may be different depending on environment. + log_file: 'mysql-bin.000001' # this may be different depending on environment. + log_pos: 0 + blocking: True # if true, mysql waits for new incoming event to send after pymyelarepl reads all the previous events. + +es: + host: '172.0.0.3' # or '127.0.0.1' + port: 9200 # or 9201 \ No newline at end of file diff --git a/example/run.py b/example/run.py new file mode 100644 index 0000000..873408f --- /dev/null +++ b/example/run.py @@ -0,0 +1,8 @@ +import os + +from pymyelarepl import PyMyElaRepl + + +config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') +pymyelarepl = PyMyElaRepl(config_path) +pymyelarepl.run() \ No newline at end of file diff --git a/pymyelarepl-desc.png b/pymyelarepl-desc.png new file mode 100644 index 0000000..2185435 Binary files /dev/null and b/pymyelarepl-desc.png differ diff --git a/pymyelarepl.Dockerfile b/pymyelarepl.Dockerfile new file mode 100644 index 0000000..c155669 --- /dev/null +++ b/pymyelarepl.Dockerfile @@ -0,0 +1,8 @@ +ARG BASE_IMAGE +FROM ${BASE_IMAGE} + +COPY pymyelarepl pymyelarepl/pymyelarepl +COPY example pymyelarepl/example +COPY test pymyelarepl/test +COPY setup.py pymyelarepl/setup.py +RUN cd pymyelarepl && pip install . \ No newline at end of file diff --git a/pymyelarepl/__init__.py b/pymyelarepl/__init__.py new file mode 100644 index 0000000..706f348 --- /dev/null +++ b/pymyelarepl/__init__.py @@ -0,0 +1 @@ +from .pymyelarepl import PyMyElaRepl \ No newline at end of file diff --git a/pymyelarepl/pymyelarepl.py b/pymyelarepl/pymyelarepl.py new file mode 100644 index 0000000..3a84334 --- /dev/null +++ b/pymyelarepl/pymyelarepl.py @@ -0,0 +1,122 @@ +import datetime +import decimal +import json + +import requests +import yaml + +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.event import XidEvent +from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent + + +class PyMyElaRepl: + def get_config_from_file(self, config_path): + try: + with open(config_path) as f: + self.config = yaml.load(f, Loader=yaml.FullLoader) + except IndexError: + raise IndexError('Must specify config file') + except FileNotFoundError: + raise FileNotFoundError('Could not find the config file') + + def __init__(self, config_path): + self.get_config_from_file(config_path) + + self.es_endpoint = 'http://{host}:{port}/_bulk'.format( + host=self.config['es']['host'], + port=self.config['es']['port'] + ) + + self.mysql_conf = dict( + [(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password']] + ) + + self.binlog_stream_reader = BinLogStreamReader( + connection_settings=self.mysql_conf, + server_id=self.config['mysql']['server_id'], + only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, XidEvent], + log_file=self.config['mysql']['log_file'], + log_pos=self.config['mysql']['log_pos'], + resume_stream=True if self.config['mysql']['log_pos'] != 0 else False, + blocking=self.config['mysql']['blocking'] + ) + + self.if_error = [] + + def send_to_es(self, converted): + resp = requests.post( + url=self.es_endpoint, + data=converted, + verify=False, + headers={'content-type': 'application/json'} + ) + + self.if_error.append(resp.json()['errors']) + print(resp.json()) + + def serialize_not_serializable(self, obj): + if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date): + return obj.isoformat() + elif isinstance(obj, decimal.Decimal): + return str(obj) + raise TypeError('Type not serializable for obj {obj}'.format(obj=obj)) + + def convert_event_to_valid_es_data_format(self, event): + converted = '' + + for e in event: + meta = json.dumps({e['action']: {'_index': e['index'], '_id': e['id']}}) + + if e['action'] == 'delete': + converted += ''.join([meta, '\n']) + elif e['action'] == 'update': + body = json.dumps({'doc': e['doc']}, default=self.serialize_not_serializable) + converted += ''.join([meta, '\n', body, '\n']) + elif e['action'] == 'create': + body = json.dumps(e['doc'], default=self.serialize_not_serializable) + converted += ''.join([meta, '\n', body, '\n']) + + return converted + + def get_binlog_event(self): + extracted_collection = [] + + for event in self.binlog_stream_reader: + if isinstance(event, XidEvent): + yield extracted_collection + + extracted_collection = [] + continue + + for row in event.rows: + if isinstance(event, DeleteRowsEvent): + extracted = { + 'index': event.table, + 'id': row['values'][event.primary_key], + 'action': 'delete' + } + elif isinstance(event, UpdateRowsEvent): + extracted = { + 'index': event.table, + 'id': row['after_values'][event.primary_key], + 'action': 'update', + 'doc': {k: v for k, v in row['after_values'].items() if k != event.primary_key} + } + elif isinstance(event, WriteRowsEvent): + extracted = { + 'index': event.table, + 'id': row['values'][event.primary_key], + 'action': 'create', + 'doc': {k: v for k, v in row['values'].items() if k != event.primary_key} + } + + extracted_collection.append(extracted) + + self.binlog_stream_reader.close() + print('Info: Mysql connection closed successfully after reading all binlog events.') + + def run(self): + for event in self.get_binlog_event(): + converted = self.convert_event_to_valid_es_data_format(event) + self.send_to_es(converted) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 84be5a5..0000000 --- a/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -PyMySQL==0.6.7 -mysql-replication>=0.8 -requests>=2.9.1 -PyYAML>=3.11 -lxml>=3.5.0 -future>=0.15.2 #for py2 compat diff --git a/setup.py b/setup.py index ded9589..ee47d2c 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,23 @@ -from setuptools import setup, find_packages -import es_sync +from setuptools import find_packages, setup setup( - name='py-mysql-elasticsearch-sync', - version=es_sync.__version__, - packages=find_packages(), - url='https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync', - license='MIT', - author='Windfarer', - author_email='windfarer@gmail.com', - description='MySQL to Elasticsearch sync tool', - install_requires=[ - 'PyMySQL==0.6.7', - 'mysql-replication==0.9', - 'requests==2.9.1', - 'PyYAML==3.11', - 'lxml==3.5.0', - 'future==0.15.2' - ], - entry_points={ - 'console_scripts': [ - 'es-sync=es_sync:start', - ] - }, - include_package_data=True + name='pymyelarepl', + version='0.2', + packages=find_packages( + include=[ + 'pymyelarepl' + ]), + install_requires = [ + 'certifi==2023.7.22', + 'cffi==1.15.1', + 'charset-normalizer==3.2.0', + 'cryptography==41.0.4', + 'idna==3.4', + 'mysql-replication==0.43.0', + 'pycparser==2.21', + 'PyMySQL==1.1.0', + 'PyYAML==6.0.1', + 'requests==2.31.0', + 'urllib3==2.0.4' + ] ) \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index 0f59283..0000000 --- a/src/__init__.py +++ /dev/null @@ -1,437 +0,0 @@ -from __future__ import print_function, unicode_literals -from future.builtins import str, range -import sys -PY2 = sys.version_info[0] == 2 - -if PY2: - import os - DEVNULL = open(os.devnull, 'wb') -else: - from subprocess import DEVNULL -def encode_in_py2(s): - if PY2: - return s.encode('utf-8') - return s - -import os.path -import yaml -import signal -import requests -import subprocess -import json -import logging -import shlex -from datetime import datetime -from lxml.etree import iterparse -from functools import reduce -from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent - -__version__ = '0.3.3.1' - - -# The magic spell for removing invalid characters in xml stream. -REMOVE_INVALID_PIPE = r'tr -d "\00\01\02\03\04\05\06\07\10\13\14\16\17\20\21\22\23\24\25\26\27\30\31\32\33\34\35\36\37"' - -DEFAULT_BULKSIZE = 100 -DEFAULT_BINLOG_BULKSIZE = 1 - - -class ElasticSync(object): - table_structure = {} - log_file = None - log_pos = None - - @property - def is_binlog_sync(self): - rv = bool(self.log_file and self.log_pos) - return rv - - def __init__(self): - try: - self.config = yaml.load(open(sys.argv[1])) - except IndexError: - print('Error: not specify config file') - exit(1) - - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \ - '--default-character-set=utf8 -X'.format(**self.config['mysql']) - - self.binlog_conf = dict( - [(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password', 'db']] - ) - - self.endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format( - host=self.config['elastic']['host'], - port=self.config['elastic']['port'], - index=self.config['elastic']['index'], - type=self.config['elastic']['type'] - ) # todo: supporting multi-index - - self.mapping = self.config.get('mapping') or {} - if self.mapping.get('_id'): - self.id_key = self.mapping.pop('_id') - else: - self.id_key = None - - record_path = self.config['binlog_sync']['record_file'] - if os.path.isfile(record_path): - with open(record_path, 'r') as f: - record = yaml.load(f) - self.log_file = record.get('log_file') - self.log_pos = record.get('log_pos') - - self.bulk_size = self.config.get('elastic').get('bulk_size') or DEFAULT_BULKSIZE - self.binlog_bulk_size = self.config.get('elastic').get('binlog_bulk_size') or DEFAULT_BINLOG_BULKSIZE - - self._init_logging() - - def _init_logging(self): - logging.basicConfig(filename=self.config['logging']['file'], - level=logging.INFO, - format='[%(levelname)s] %(asctime)s %(message)s') - self.logger = logging.getLogger(__name__) - logging.getLogger("requests").setLevel(logging.WARNING) # disable requests info logging - - def cleanup(*args): - self.logger.info('Received stop signal') - self.logger.info('Shutdown') - sys.exit(0) - - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGTERM, cleanup) - - def _post_to_es(self, data): - """ - send post requests to es restful api - """ - resp = requests.post(self.endpoint, data=data) - if resp.json().get('errors'): # a boolean to figure error occurs - for item in resp.json()['items']: - if list(item.values())[0].get('error'): - logging.error(item) - else: - self._save_binlog_record() - - def _bulker(self, bulk_size): - """ - Example: - u = bulker() - u.send(None) #for generator initialize - u.send(json_str) # input json item - u.send(another_json_str) # input json item - ... - u.send(None) force finish bulk and post - """ - while True: - data = "" - for i in range(bulk_size): - item = yield - if item: - data = data + item + "\n" - else: - break - # print(data) - print('-'*10) - if data: - self._post_to_es(data) - - def _updater(self, data): - """ - encapsulation of bulker - """ - if self.is_binlog_sync: - u = self._bulker(bulk_size=self.binlog_bulk_size) - else: - u = self._bulker(bulk_size=self.bulk_size) - - u.send(None) # push the generator to first yield - for item in data: - u.send(item) - u.send(None) # tell the generator it's the end - - def _json_serializer(self, obj): - """ - format the object which json not supported - """ - if isinstance(obj, datetime): - return obj.isoformat() - raise TypeError('Type not serializable') - - def _processor(self, data): - """ - The action must be one of the following: - create - Create a document only if the document does not already exist. - index - Create a new document or replace an existing document. - update - Do a partial update on a document. - delete - Delete a document. - """ - for item in data: - if self.id_key: - action_content = {'_id': item['doc'][self.id_key]} - else: - action_content = {} - meta = json.dumps({item['action']: action_content}) - if item['action'] == 'index': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'update': - body = json.dumps({'doc': item['doc']}, default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'delete': - rv = meta + '\n' - elif item['action'] == 'create': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - else: - logging.error('unknown action type in doc') - raise TypeError('unknown action type in doc') - yield rv - - def _mapper(self, data): - """ - mapping old key to new key - """ - for item in data: - if self.mapping: - for k, v in self.mapping.items(): - item['doc'][k] = item['doc'][v] - del item['doc'][v] - # print(doc) - yield item - - def _formatter(self, data): - """ - format every field from xml, according to parsed table structure - """ - for item in data: - for field, serializer in self.table_structure.items(): - if item['doc'][field]: - try: - item['doc'][field] = serializer(item['doc'][field]) - except ValueError as e: - self.logger.error("Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format( - msg=str(e), - item=str(item))) - item['doc'][field] = None - # print(item) - yield item - - def _binlog_loader(self): - """ - read row from binlog - """ - if self.is_binlog_sync: - resume_stream = True - logging.info("Resume from binlog_file: {file} binlog_pos: {pos}".format(file=self.log_file, - pos=self.log_pos)) - else: - resume_stream = False - - stream = BinLogStreamReader(connection_settings=self.binlog_conf, - server_id=self.config['mysql']['server_id'], - only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], - only_tables=[self.config['mysql']['table']], - resume_stream=resume_stream, - blocking=True, - log_file=self.log_file, - log_pos=self.log_pos) - for binlogevent in stream: - self.log_file = stream.log_file - self.log_pos = stream.log_pos - for row in binlogevent.rows: - if isinstance(binlogevent, DeleteRowsEvent): - rv = { - 'action': 'delete', - 'doc': row['values'] - } - elif isinstance(binlogevent, UpdateRowsEvent): - rv = { - 'action': 'update', - 'doc': row['after_values'] - } - elif isinstance(binlogevent, WriteRowsEvent): - rv = { - 'action': 'index', - 'doc': row['values'] - } - else: - logging.error('unknown action type in binlog') - raise TypeError('unknown action type in binlog') - yield rv - # print(rv) - stream.close() - raise IOError('mysql connection closed') - - def _parse_table_structure(self, data): - """ - parse the table structure - """ - for item in data.iter(): - if item.tag == 'field': - field = item.attrib.get('Field') - type = item.attrib.get('Type') - if 'int' in type: - serializer = int - elif 'float' in type: - serializer = float - elif 'datetime' in type: - if '(' in type: - serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f') - else: - serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S') - elif 'char' in type: - serializer = str - elif 'text' in type: - serializer = str - else: - serializer = str - self.table_structure[field] = serializer - - def _parse_and_remove(self, f, path): - """ - snippet from python cookbook, for parsing large xml file - """ - path_parts = path.split('/') - doc = iterparse(f, ('start', 'end'), recover=False, encoding='utf-8', huge_tree=True) - # Skip the root element - next(doc) - tag_stack = [] - elem_stack = [] - for event, elem in doc: - if event == 'start': - tag_stack.append(elem.tag) - elem_stack.append(elem) - elif event == 'end': - if tag_stack == path_parts: - yield elem - elem_stack[-2].remove(elem) - if tag_stack == ['database', 'table_structure']: # dirty hack for getting the tables structure - self._parse_table_structure(elem) - elem_stack[-2].remove(elem) - try: - tag_stack.pop() - elem_stack.pop() - except IndexError: - pass - - def _xml_parser(self, f_obj): - """ - parse mysqldump XML streaming, convert every item to dict object. 'database/table_data/row' - """ - for row in self._parse_and_remove(f_obj, 'database/table_data/row'): - doc = {} - for field in row.iter(tag='field'): - k = field.attrib.get('name') - v = field.text - doc[k] = v - yield {'action': 'index', 'doc': doc} - - def _save_binlog_record(self): - if self.is_binlog_sync: - with open(self.config['binlog_sync']['record_file'], 'w') as f: - logging.info("Sync binlog_file: {file} binlog_pos: {pos}".format( - file=self.log_file, - pos=self.log_pos) - ) - yaml.safe_dump({"log_file": self.log_file, "log_pos": self.log_pos}, f, default_flow_style=False) - - def _xml_dump_loader(self): - mysqldump = subprocess.Popen( - shlex.split(encode_in_py2(self.dump_cmd)), - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - remove_invalid_pipe = subprocess.Popen( - shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), - stdin=mysqldump.stdout, - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - return remove_invalid_pipe.stdout - - def _xml_file_loader(self, filename): - f = open(filename, 'rb') # bytes required - return f - - def _send_email(self, title, content): - """ - send notification email - """ - if not self.config.get('email'): - return - - import smtplib - from email.mime.text import MIMEText - - msg = MIMEText(content) - msg['Subject'] = title - msg['From'] = self.config['email']['from']['username'] - msg['To'] = ', '.join(self.config['email']['to']) - - # Send the message via our own SMTP server. - s = smtplib.SMTP() - s.connect(self.config['email']['from']['host']) - s.login(user=self.config['email']['from']['username'], - password=self.config['email']['from']['password']) - s.sendmail(msg['From'], msg['To'], msg=msg.as_string()) - s.quit() - - def _sync_from_stream(self): - logging.info("Start to dump from stream") - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_dump_loader()) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_file(self): - logging.info("Start to dump from xml file") - logging.info("Filename: {}".format(self.config['xml_file']['filename'])) - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_file_loader(self.config['xml_file']['filename'])) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_binlog(self): - logging.info("Start to sync binlog") - docs = reduce(lambda x, y: y(x), [self._mapper, self._processor], self._binlog_loader()) - self._updater(docs) - - def run(self): - """ - workflow: - 1. sync dump data - 2. sync binlog - """ - try: - if not self.is_binlog_sync: - if len(sys.argv) > 2 and sys.argv[2] == '--fromfile': - self._sync_from_file() - else: - self._sync_from_stream() - self._sync_from_binlog() - except Exception: - import traceback - logging.error(traceback.format_exc()) - self._send_email('es sync error', traceback.format_exc()) - raise - - -def start(): - instance = ElasticSync() - instance.run() - -if __name__ == '__main__': - start() diff --git a/src/sample.yaml b/src/sample.yaml deleted file mode 100644 index 8756f82..0000000 --- a/src/sample.yaml +++ /dev/null @@ -1,45 +0,0 @@ -# The mysql database which you want to sync -mysql: - host: 127.0.0.1 - port: 3306 - user: foo - password: bar - db: mydb - table: mytable - server_id: 1 # this should be unique - -elastic: - host: 127.0.0.1 - port: 9200 - bulk_size: 200 # the update bulk size when mysqldump, default is 100 if not specified - binlog_bulk_size: 10 # the update bulk size when syncing binlog, default is 1 if not specified - index: article - type: article - -# path to your own xml file, if you want to initialize dump from xml file. run with argument --fromfile in command -xml_file: - filename: a.xml - -# If you want to map your column, put the column name as the value, and es field name as the key, -# Particularly , if you set _id as follows, it will use myid column as the index doc's id, or ES will generate an id as default -mapping: - _id: myid - es_field_name: mysql_column_name - -# The log file's path -logging: - file: mylog.log - -# The record file's path, which record the latest synced binlog file and position -binlog_sync: - record_file: binlog.info - -# If you want to email notification when error occurs, fill this -email: - from: # the sender's email, uses smtp protocol - host: smtp.example.com - username: sender@example.com - password: senderpassword - to: # a list of notification recipients - - first_recipient@example.com - - second_recipient@example.com \ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/config.yaml b/test/config.yaml new file mode 100644 index 0000000..e4af123 --- /dev/null +++ b/test/config.yaml @@ -0,0 +1,16 @@ +# all fields must be filled. + +mysql: + host: '172.0.0.2' # or '127.0.0.1' + port: 3306 # or 3307 + user: 'root' + password: '' + db: 'test_db_for_pymyelarepl' + server_id: 3 # this should be unique. this may be different depending on environment. + log_file: 'mysql-bin.000001' # this shoule be different depending on environment. + log_pos: 0 + blocking: False # if true, mysql waits for new incoming event to send after pymyelarepl reads all the previous events. + +es: + host: '172.0.0.3' # or '127.0.0.1' + port: 9200 # or 9201 \ No newline at end of file diff --git a/test/test_basic.py b/test/test_basic.py new file mode 100644 index 0000000..30b28ca --- /dev/null +++ b/test/test_basic.py @@ -0,0 +1,77 @@ +import os + +import pymysql +import requests +import unittest +import yaml + +from pymyelarepl import PyMyElaRepl + + +class BasicTestCase(unittest.TestCase): + def execute(self, query): + cursor = self.conn_control.cursor() + cursor.execute(query) + return cursor + + def setUp(self): + config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') + + with open(config_path) as f: + self.config = yaml.load(f, Loader=yaml.FullLoader) + + mysql_config = { + "host": self.config['mysql']['host'], + "user": self.config['mysql']['user'], + "passwd": self.config['mysql']['password'], + "port": self.config['mysql']['port'], + "use_unicode": True, + "charset": "utf8", # regarded as utf8mb4 + } + + self.conn_control = pymysql.connect(**mysql_config) + self.execute("DROP DATABASE IF EXISTS {db}".format(db=self.config['mysql']['db'])) + self.execute("CREATE DATABASE {db}".format(db=self.config['mysql']['db'])) + self.execute("USE {db}".format(db=self.config['mysql']['db'])) + self.execute("RESET MASTER") + + self.es_url_for_all_data = 'http://{host}:{port}/_all'.format( + host=self.config['es']['host'], + port=self.config['es']['port'] + ) + + self.pymyelarepl = PyMyElaRepl(config_path) + + def test_basic_replication(self): + self.execute( + """ + CREATE TABLE basic_replication( + id INT PRIMARY KEY AUTO_INCREMENT, + f FLOAT, + t TIMESTAMP) + """ + ) + + self.execute("INSERT INTO basic_replication(id, f, t) VALUES(1, 12.34, '2023-09-25 00:00:00')") + self.execute("INSERT INTO basic_replication(id, f, t) VALUES(2, 12.34, '2023-09-25 00:00:00')") + self.conn_control.commit() + + self.execute("UPDATE basic_replication SET f=56.78 WHERE id=1") + self.execute("UPDATE basic_replication SET f=56.78 WHERE id=2") + self.conn_control.commit() + + self.execute("DELETE FROM basic_replication WHERE id=1") + self.execute("DELETE FROM basic_replication WHERE id=2") + self.conn_control.commit() + + self.pymyelarepl.run() + if_error = True if True in self.pymyelarepl.if_error else False + self.assertEqual(if_error, False) + + def tearDown(self): + self.execute("DROP DATABASE IF EXISTS {db}".format(db=self.config['mysql']['db'])) + self.execute("RESET MASTER") + self.conn_control.close() + requests.delete(self.es_url_for_all_data) + +unittest.main() \ No newline at end of file diff --git a/upstart.conf b/upstart.conf deleted file mode 100644 index 9f5054a..0000000 --- a/upstart.conf +++ /dev/null @@ -1,12 +0,0 @@ -description 'es sync' -start on runlevel [2345] -stop on runlevel [06] - -respawn -normal exit 0 - -chdir - -script - es-sync config.yaml -end script \ No newline at end of file