用python操作clickhouse

用python操作clickhouse

前阵子小老板让远程将一台服务器的数据上传到clickhouse上,由于ClickHouse官方只支持JDBC和ODBC,这里采用第三方的python包,clickhouse_driver。

网上很多博客关于这个包的使用都是错误的,orz果然直接看官网文档是最好的方法。

这里记录一些clickhouse_drvier的简单操作。

###关于clickhouse

这里只简单介绍一下,是一个开源的列式数据库(DBMS),主要用于在线分析处理查询(OLAP),其主要特点是有丰富的表引擎。

###clickhouse-drvier执行语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from clickhouse_driver import Client

# 这里的execute不能执行插入语句,提供给select、drop等语句使用
def execute_sql(sql):
# 注意clickhouse所在server和data所在的server不一样
# 访问11的服务不需要11的账号和密码,这里是clickhouse的
up_ip = "your_ip"
up_port = 9001 # clickhouse有两个端口9001和8123
ch_username = "default"
ch_password = "your_password"
ch_database = "test"
client = Client(host=up_ip, port=up_port,
user=ch_username, database=ch_database, password=ch_password)
ch_sql = sql
try:
client.execute(ch_sql, types_check=True)
except Exception as error_message:
print("fail to execute sql")
print(error_message)

###插入表

上传语句会特殊一点,不能直接使用sql语句封装后插入,需要单独提供数据并使用VALUES子句结束。

如果直接使用execute_sql(“插入语句”)会遇上超时的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def up_clickhouse(up_values):
up_ip = "服务器ip"
up_port = 9001
ch_username = "default"
ch_password = "clickhouse的password"
ch_database = "test"
client = Client(host=up_ip, port=up_port,
user=ch_username, database=ch_database, password=ch_password)
try:
client.execute(
'insert into test.m2 values',
up_values
)
except Exception as error_message:
print("fail to insert data")
print(error_message)

#####JSON格式可以这样插入

1
2
3
4
5
ch_sql = "insert into test.m2 format JSONEachRow "
client.execute(
ch_sql,
line_dic
)

删除

1
2
3
def del_table(table_name="m2"):
ch_sql = "drop table if exists test.{}".format(table_name)
execute_sql(ch_sql)

创建

clickhouse的table在创建的时候需要选择引擎,引擎决定了数据的存储方式,支持的查询等等各项不同的功能,这里不一一介绍,可以去[ClickHouse文档][clickhouse.yandex/docs/zh]看。

对于大多数正式的任务,应该使用MerfeTree族中的引擎。

一般用于测试的引擎可以考虑Memory,它会将数据存储在RAM中。

1
2
3
4
5
6
7
8
9
10
def create_table_separate(table_name="m2"):
ch_sql = """CREATE TABLE test.{} (\
...,\
字段名 类型
...
)ENGINE = MergeTree PARTITION BY toYYYYMMDD(toDate(found_time))\
ORDER BY (字段名)\
SETTINGS index_granularity = 8192""".format(table_name)
execute_sql(ch_sql)
print("Create table {}".format(table_name))

连接远程服务器

1
2
3
4
5
6
7
8
9
10
11
def load_RemoteServer(host_ip, host_port, host_username, host_password, remote_path, start, end):
client = paramiko.SSHClient()
try:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, host_port, host_username, host_password, timeout=5)
sftp_client = client.open_sftp()
remote_file = sftp_client.open(remote_path, 'r')
except:
...
finally:
client.close()

####总结

这里只是简单介绍了一下clickhouse-drvier的用法,便于自己以后的记忆,更深层的理解,还需要查阅官方文档。

这是我的第一篇博客,虽然只是些复制粘贴的工作orz

希望自己以后会多写博客,记录自己的成长。

参考资料

[clickhouse官方文档][clickhouse.yandex/docs/zh/]

[clickhouse-drvier文档][clickhouse-driver.readthedocs.io/en/latest/index.html]