使用Python和MongoDB处理CSV数据

7/12/2015

有时候我们需要处理excel或者其他的数据源的数据,这些数据可能是保存在csv文件中。我们一般将其转换为JSON数据格式后,导入到数据库,做数据统计分析使用。数据转换的过程,可使用openRefine等工具或者自己写Python程序定义转换,这个过程就是数据的ETL(extract,transform,and load)过程。本节主要是通过一个简单的例子来讲解如何使用MongoDB和Python来处理数据。

首先如果你还没有接触过CSV格式的数据,以下是一个很好的例子。定义了北京地区的空气质量情况的数据。我们的目的是对于这些数据做一些统计分析工作。

updatetime,AreaName,jczname,AQI,PM2_5,O3,PM10,SO2,NO2,CO
2013-02-01 00:00:00,北京市,万寿西宫,186,94,2,—,108,79,5.000
2013-02-01 00:00:00,北京市,定陵,158,69,4,—,110,100,3.600
2013-02-01 00:00:00,北京市,东四,183,100,2,—,117,97,5.400
2013-02-01 00:00:00,北京市,天坛,179,84,11,—,64,67,5.200
2013-02-01 00:00:00,北京市,农展馆,215,97,9,—,119,97,5.200

一般CSV第一行包含的是描述信息,代表了每一列数据的意义。这样在下一步的数据处理的时候就会有针对性的处理自己想要的数据。

####转换数据

转换数据一般分为两种方式,一种使用现有的工具转换,另一种则是自己去写一些简短的代码了转换数据。这里分别对于两种数据转换方法进行解释说明。

#####使用Python转换数据 如果使用Python直接进行文本数据的转换,分别取每一行的数据然后将其进行转换。这里使用NumPy进行操作,提前使用pip或者easy_install 安装对应模块到系统中。

#Part-One
#encoding:utf-8    

# 导入numpy库进行数据的处理
import numpy as np    
#我们只对PM2_5数据列感兴趣
airData=np.genfromtxt("air.csv",skip_header=1,usecols=(0,1,2,4),dtype=None,delimiter=',')
airDataName=["Date","City","Distict","PM2_5"]
airDataJson=[]
for data in airData:
    airData_List=zip(airDataName,data)
    airDataJson.append(dict(airData_List))
print airDataJson

这段代码只是简单的将csv文件转换为json格式的数据,并修改了keyName. 转换后的格式如下:

[{'Date': '2013-02-01 00:00:00', 'City': '北京市', 'PM2_5': '94', 'Distict': '万寿西宫'}, 
 {'Date': '2013-02-01 00:00:00', 'City': '北京市', 'PM2_5': '69', 'Distict': '定陵'}
 ... 
]

此时的数据已经可以直接插入到数据库中了。

#####使用现有工具进行转换

这里推荐大家使用的是OpenRefine工具进行数据的转换。这套软件是一套Google的开源软件,可以运行在自己的机器上,做数据的清洗和数据的转换,方便简单,功能强大,这里只是使用其中的小部分数据转换功能。另外OpenRefine基于Java环境,因此是跨平台的一套软件。OpenRefine官方网址

下载安装之后即可在个人的浏览器中打开并运行操作。

image

详细的使用说明可以参考openrefine使用说明

  1. 导入数据文件
  2. 创建项目
  3. 选中需要修改的列名称,点击后选择Edit Column,选择Rename this column,或者删除该列
  4. 导出数据:点击Export|Templating,点击Export导出

这就完成了所有的数据转换功能,是不是很简单。 导出后的数据格式如下:

 {
  "rows" : [
    {
      "Date" : "2013-02-01 00:00:00",
      "City" : "北京市",
      "District" : "万寿西宫",
      "PM2_5" : 94
    },
    {
      "Date" : "2013-02-01 00:00:00",
      "City" : "北京市",
      "District" : "定陵",
      "PM2_5" : 69
    },
...
  ]
}

####导入数据

这里延续使用python的方式编写程序将数据插入到MongoDB数据库。这里我们需要使用的python模块是pymongo。使用pip或者easy_install 安装对应模块到系统中。

#Part-Two
...
from pymongo import MongoClient
import json 

con=MongoClient()
db=con.blog
airCollection=db.airCollection
airCollection.insert(airDataJson)

#如果使用openRefine产生的文件则通过下面的代码导    
# with open("air.txt") as f:
#     data=json.loads(f.read())
# airCollection.insert(data["rows"])

####如何统计数据

一旦我们将数据放到mongoDB中,就可以使用mongoDB强大的处理能力高效的完成数据的归档,分类,聚合等操作。这里我们先来做一些简单的分组操作。

首先看下导入到数据库的格式是否正确:

> db.airCollection.findOne()
{
    "_id" : ObjectId("55a20bb732fb760a67a9e918"),
    "Date" : "2013-02-01 00:00:00",
    "City" : "北京市",
    "Distict" : "万寿西宫",
    "PM2_5" : "94"
}
> db.airCollection.count()
273    

#####MongoDB聚合函数

下面我们会接触到的是一些常规的聚合函数,这些函数在内部使用c++来处理bson格式的数据.运行效率比较高,支持串行操作,以下是一些聚合函数的定义。

  1. $match: 使用查询语句来过滤数据集合,返回过滤后的数据给下一个操作
  2. $group: 通过id来对数据进行分组,并支持所有的计算操作$max,$sum,等
  3. $sort: 对数据进行排序操作
  4. $skip: 跳过某些符合要求的数据内容
  5. $limit: 将数据集合限制在特定的范围内
  6. $unwind: 将一个符合要求的数据进行拆分,输入条件必须是一个数组的名称。
  7. $project: 提取想要的列数据,忽略其他的数据

管道的使用就如下图所示:

image

######全数据统计

下面我们统计的是在这24小时内每个地区的平均PM2.5的值: 统计代码及输出内容:

result=airCollection.aggregate([{"$group":{"_id":"$District","avg":{"$avg":"$PM2_5"},"count":{"$sum":1}}}])
for doc in result:
   print doc

数据输出为:

{ "_id" : "古城", "avg" : 35.25, "count" : 24 }
{ "_id" : "农展馆", "avg" : 37.083333333333336, "count" : 25 }
{ "_id" : "昌平镇", "avg" : 30.5, "count" : 24 }
{ "_id" : "顺义新城", "avg" : 35.52, "count" : 25 }
{ "_id" : "怀柔镇", "avg" : 36.44, "count" : 25 }
{ "_id" : "海淀区万柳", "avg" : 33.16, "count" : 25 }
{ "_id" : "官园", "avg" : 34.44, "count" : 25 }
{ "_id" : "万寿西宫", "avg" : 32.96, "count" : 25 }
{ "_id" : "定陵", "avg" : 47.9, "count" : 25 }
{ "_id" : "天坛", "avg" : 26.28, "count" : 25 }
{ "_id" : "东四", "avg" : 32.25, "count" : 25 }

######删除不必要的统计项目 如果数据是完整的,统计结果也许没什么问题,但是我们的数据是不干净的,里面有些地区在获取PM2.5时候并未得到正确的值 使用了“-“代替空值,我们要将此数据过滤掉,所以需要前置一个匹配符号:

   result=airCollection.aggregate([{"$match":{"PM2_5":{"$ne":"―"}}},{"$group":{"_id":"$District","count":{"$avg":"$PM2_5"}}}])

数据输出为:

{ "_id" : "古城", "avg" : 35.25, "count" : 24 }
{ "_id" : "农展馆", "avg" : 37.083333333333336, "count" : 24 }
{ "_id" : "昌平镇", "avg" : 30.5, "count" : 24 }
{ "_id" : "顺义新城", "avg" : 35.52, "count" : 25 }
{ "_id" : "怀柔镇", "avg" : 36.44, "count" : 25 }
{ "_id" : "海淀区万柳", "avg" : 33.16, "count" : 25 }
{ "_id" : "官园", "avg" : 34.44, "count" : 25 }
{ "_id" : "万寿西宫", "avg" : 32.96, "count" : 25 }
{ "_id" : "定陵", "avg" : 47.9, "count" : 10 }
{ "_id" : "天坛", "avg" : 26.28, "count" : 25 }
{ "_id" : "东四", "avg" : 32.25, "count" : 24 }

######管道操作 另外我们还可以使用sort或者limit来统计前10名的数据量

result=airCollection.aggregate([{"$match":{"PM2_5":{"$ne":"―"}}},{"$project":{"Date":1,"District":1,"PM2_5":1,"_id":0}},{"$sort":{"PM2_5":-1}},{"$limit":10}])

数据输出为:

{ "Date" : "2013-02-01 00:00:00", "PM2_5" : 154, "District" : "顺义新城" }
{ "Date" : "2013-02-01 00:00:00", "PM2_5" : 154, "District" : "顺义新城" }
{ "Date" : "2013-02-01 02:00:00", "PM2_5" : 127, "District" : "农展馆" }
{ "Date" : "2013-02-01 02:00:00", "PM2_5" : 127, "District" : "农展馆" }
{ "Date" : "2013-02-01 00:00:00", "PM2_5" : 126, "District" : "怀柔镇" }
{ "Date" : "2013-02-01 00:00:00", "PM2_5" : 126, "District" : "怀柔镇" }
{ "Date" : "2013-02-01 01:00:00", "PM2_5" : 120, "District" : "怀柔镇" }
{ "Date" : "2013-02-01 01:00:00", "PM2_5" : 120, "District" : "怀柔镇" }
{ "Date" : "2013-02-01 01:00:00", "PM2_5" : 115, "District" : "顺义新城" }
{ "Date" : "2013-02-01 01:00:00", "PM2_5" : 115, "District" : "顺义新城" }

以上就是使用MongoDB和Python简单的数据处理工作,如果需要更加复杂的操作,map-reduce或许能做的更多一些。如果感兴趣的话可自行查阅相关文档。这部分内容我会在下一篇mongoDB的文章中讲述。


MongoDB Python 页面已被访问2112次

发表评论