需求
有一個表,里面數據量比較大,每天一更新,其字段可以通過xml配置文件進行配置,即,可能每次建表的字段不一樣。
上游跑時會根據配置從源文件中提取,到入庫這一步需要根據配置進行建表。
解決
寫了一個簡單的xml,配置需要字段及類型
上游讀取到對應的數據
入庫這一步,先把原表刪除,根據配置建新表
XML文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<? xml version = "1.0" encoding = "UTF-8" ?> <!-- 表名 ,數據庫名 可靈活配置插入哪個庫哪個表 --> < table name = "top_query" db_name = "evaluting_sys" > <!-- 非業務主鍵,自增長,可配名,其他 INTEGER UNSIGNED AUTO_INCREMENT --> < primary_key > < name >id</ name > </ primary_key > <!-- 字段開始 --> < field > < name >query</ name > < type >varchar(200)</ type > < is_index >false</ is_index > < description >query</ description > </ field > < field > < name >pv</ name > < type >integer</ type > < is_index >false</ is_index > < description >pv</ description > </ field > < field > < name >avg_money</ name > < type >integer</ type > < is_index >false</ is_index > < description ></ description > </ field > <!-- 字段配置結束 --> </ table > |
處理腳本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#!/usr/bin/python # -*- coding:utf-8 -*- #author: wklken #desc: use to read db xml config. #----------------------- #2012-02-18 created #---------------------- import sys,os from xml.dom import minidom, Node def read_dbconfig_xml(xml_file_path): content = {} root = minidom.parse(xml_file_path) table = root.getElementsByTagName( "table" )[ 0 ] #read dbname and table name. table_name = table.getAttribute( "name" ) db_name = table.getAttribute( "db_name" ) if len (table_name) > 0 and len (db_name) > 0 : db_sql = "create database if not exists `" + db_name + "`; use " + db_name + ";" table_drop_sql = "drop " + table_name + " if exists " + table_name + ";" content.update({ "db_sql" : db_sql}) content.update({ "table_sql" : table_drop_sql }) else : print "Error:attribute is not define well! db_name=" + db_name + " ;table_name=" + table_name sys.exit( 1 ) #print table_name, db_name table_create_sql = "create table " + table_name + "(" #read primary cell primary_key = table.getElementsByTagName( "primary_key" )[ 0 ] primary_key_name = primary_key.getElementsByTagName( "name" )[ 0 ].childNodes[ 0 ].nodeValue table_create_sql + = primary_key_name + " INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY," #print primary_key.toxml() #read ordernary field fields = table.getElementsByTagName( "field" ) f_index = 0 for field in fields: f_index + = 1 name = field.getElementsByTagName( "name" )[ 0 ].childNodes[ 0 ].nodeValue type = field.getElementsByTagName( "type" )[ 0 ].childNodes[ 0 ].nodeValue table_create_sql + = name + " " + type if f_index ! = len (fields): table_create_sql + = "," is_index = field.getElementsByTagName( "is_index" )[ 0 ].childNodes[ 0 ].nodeValue table_create_sql + = ");" content.update({ "table_create_sql" : table_create_sql}) #character set latin1 collate latin1_danish_ci; print content if __name__ = = "__main__" : read_dbconfig_xml(sys.argv[ 1 ]) |
PYTHON解析XML大文件[SAX]
需求
讀取xml數據文件,文件較大,需要實時處理插入到數據庫
xml文檔
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
< PERSONS > < person > < id >100000</ id > < sex >男</ sex > < address >北京,海淀區</ address > < fansNum >437</ fansNum > < summary >1989</ summary > < wbNum >333</ wbNum > < gzNum >242</ gzNum > < blog >null</ blog > < edu >大學</ edu > < work ></ work > < renZh >1</ renZh > < brithday >2月14日</ brithday > </ person > </ PERSONS > |
處理
sax處理時并不會像dom一樣可以以類似節點的維度進行讀取,它只有 開始標簽 內容 結束標簽 之分
處理思想是:通過一個handler,對開始標簽,內容,結束標簽各有一個處理函數
代碼及注解
person 處理類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from xml.sax import handler,parseString class PersonHandler(handler.ContentHandler): def __init__( self , db_ops): #db op obj self .db_ops = db_ops #存儲一個person的map self .person = {} #當前的tag self .current_tag = "" #是否是tag之間的內容 ,目的拿到tag間內容,不受空白的干擾 self .in_quote = 0 #開始,清空map def startElement( self , name, attr): #以person,清空map if name = = "person" : self .person = {} #記錄 狀態 self .current_tag = name self .in_quote = 1 #結束,插入數據庫 def endElement( self , name): #以person結尾 代表讀取一個person的信息結束 if name = = "person" : #do something in_fields = tuple ([ ( '"' + self.person.get(i,"") + '"' ) for i in fields ]) print in_sql % in_fields db_ops.insert( in_sql % (in_fields)) #處理 self .in_quote = 0 def characters( self , content): #若是在tag之間的內容,更新到map中 if self .in_quote: self .person.update({ self .current_tag: content}) |
加上入庫的完整代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
#!/usr/bin/python # -*- coding:utf-8 -*- #parse_person.py #version : 0.1 #author : [email protected] #desc : parse person.xml and out sql import sys,os import MySQLdb reload (sys) sys.setdefaultencoding( 'utf-8' ) in_sql = "insert into person( id ,sex,address,fansNum,summary,wbNum,gzNum,blog,edu,work,renZh,brithday) values( % s, % s, % s, % s, % s, % s, % s, % s, % s, % s, % s, % s)" fields = ( "id" , "sex" , "address" , "fansNum" , "summary" , "wbNum" , "gzNum" , "blog" , "edu" , "work" , "renZh" , "brithday" ) #數據庫方法 class Db_Connect: def __init__( self , db_host, user, pwd, db_name, charset = "utf8" , use_unicode = True ): print "init begin" print db_host, user, pwd, db_name, charset , use_unicode self .conn = MySQLdb.Connection(db_host, user, pwd, db_name, charset = charset , use_unicode = use_unicode) print "init end" def insert( self , sql): try : n = self .conn.cursor().execute(sql) return n except MySQLdb.Warning, e: print "Error: execute sql '" ,sql, "' failed" def close( self ): self .conn.close() #person 處理類 from xml.sax import handler,parseString class PersonHandler(handler.ContentHandler): def __init__( self , db_ops): #db op obj self .db_ops = db_ops #存儲一個person的map self .person = {} #當前的tag self .current_tag = "" #是否是tag之間的內容 self .in_quote = 0 #開始,清空map def startElement( self , name, attr): #以person,清空map if name = = "person" : self .person = {} #記錄 狀態 self .current_tag = name self .in_quote = 1 #結束,插入數據庫 def endElement( self , name): #以person結尾 代表讀取一個person的信息結束 if name = = "person" : #do something in_fields = tuple ([ ( '"' + self.person.get(i,"") + '"' ) for i in fields ]) print in_sql % in_fields db_ops.insert( in_sql % (in_fields)) #處理 self .in_quote = 0 def characters( self , content): #若是在tag之間的內容,更新到map中 if self .in_quote: self .person.update({ self .current_tag: content}) if __name__ = = "__main__" : f = open ( "./person.xml" ) #如果源文件gbk 轉碼 若是utf-8,去掉decode.encode db_ops = Db_Connect( "127.0.0.1" , "root" , "root" , "test" ) parseString(f.read().decode( "gbk" ).encode( "utf-8" ), PersonHandler(db_ops)) f.close() db_ops.close() |