本文将展示如何利用Python天生的数据来填充数据库。
我还将向你展示如何利用Neo4j沙箱,这样就可以利用不同的Neo4j数据库设置。

运用Python在Neo4j中创建图数据库_数据_节点 智能问答

可以在这里找到一个谷歌Colab条记本:https://colab.research.google.com/drive/1J9__HotNoINHpucoipLH-4qWc48GALAk?usp=sharing

里面有这篇文章的代码。
(那本条记本上有如何连接Colab和Kaggle的解释,可以让你更快地***数据。
)

必要的工具Neo4j Python驱动程序(撰写本文时为4.2版)jupiter notebook/Lab或谷歌Colab(可选)pandas利用Python清理数据

现在我们可以开始用Python做一些数据处理了。

为了写这篇文章,我们将利用在Kaggle上找到的arXiv数据集,个中包含超过170万篇STEM学术论文。
(在写这篇文章的时候,已经是第18版了。
)你可以将数据***到本地机器

https://www.kaggle.com/Cornell-University/arxiv

现在进入条记本,我们可以开始查看数据。
我通过以下办法加载数据:

file="./arxiv-metadata-oai-snapshot.json"metadata=[]lines=100000#100k测试withopen(file,'r')asf:forlineintqdm(f):metadata.append(json.loads(line))lines-=1iflines==0:breakdf=pd.DataFrame(metadata)

(你不必利用tqdm,但我创造在文件大小超过179万个条款时检讨进度很有帮助。
)

可以通过df看到,我们的数据构造为:

idobjectsubmitterobjectauthorsobjecttitleobjectcommentsobjectjournal-refobjectdoiobjectreport-noobjectcategoriesobjectlicenseobjectabstractobjectversionsobjectupdate_dateobjectauthors_parsedobject

假设我们想用这个数据框构建一个图,我们想知道哪些作者揭橥了哪些论文,以及这些论文与哪些种别干系联。

然后,我们希望有三种不同的节点类型与之对应:作者、论文和种别。

每个节点类型都有一两个属性。
对付作家来说,有作者的名字。
论文可以有ID和标题。
末了,种别有自己的名称。
我们也有一些关系:作者和作者,论文和论文。

因此,我们的目标是拥有以下数据模型(用arrows.app绘制):

有一些列对我们很有用。
例如,我打算保留id,这样我们就可以利用它作为每个论文的唯一索引。
之后,我想要得到每个作者的个人列表。
此外,authors_parsed列为我们供应了一个更清晰的所有作者列表。
当然,我们将保留标题栏作为论文的紧张属性。
末了,我想保留categories列。

下一步是轻微清理一下数据,这样数据帧的每行有一个作者,每行有一个种别。
例如,我们看到authors_parsed列给出了一个列表,个中每个条款在名称后面都有一个多余的逗号。

如果我们大略地将其导入到数据库中,我们将得到author节点,如(显示一个小示例):

╒════════════════════════════════════╕│"n"│╞════════════════════════════════════╡│{"name":["Balázs","C.",""]}│├────────────────────────────────────┤│{"name":["Berger","E.L.",""]}│├────────────────────────────────────┤│{"name":["Nadolsky","P.M.",""]}│├────────────────────────────────────┤│{"name":["Yuan","C.-P.",""]}│├────────────────────────────────────┤│{"name":["Streinu","Ileana",""]}│└────────────────────────────────────┘

由于这不是一件令人愉快的事情(并且会导致查询不是最优雅的),我们须要轻微清理一下。
我们还看到categories列可以有一个单独的种别,也可以有几个不采取传统列表格式的种别(如本示例的末了一行所示):

╒═══════════════════════════════════╕│"c"│╞═══════════════════════════════════╡│{"category":"hep-ph"}│├───────────────────────────────────┤│{"category":"math.COcs.CG"}│├───────────────────────────────────┤│{"category":"physics.gen-ph"}│├───────────────────────────────────┤│{"category":"math.CO"}│├───────────────────────────────────┤│{"category":"math.CAmath.FA"}│└───────────────────────────────────┘

我们可以在Cypher中这样做,但为了这篇文章的目的,我们将在Python中做清理,以便解释。

创建两个帮助函数来清理这两列:

defget_author_list(line):#打消authordataframe列,在行中创建作者列表。
return[e[1]+''+e[0]foreinline]defget_category_list(line):#打消“category”列,在该行中创建种别列表。
returnlist(line.split(""))df['cleaned_authors_list']=df['authors_parsed'].map(get_author_list)df['category_list']=df['categories'].map(get_category_list)df=df.drop(['submitter','authors','comments','journal-ref','doi','report-no','license','versions','update_date','abstract','authors_parsed','categories'],axis=1)

得到的数据帧:

现在我们有东西可以用了!

创建一个Neo4j沙箱

Neo4j沙箱可以对Neo4j免费利用。
你可以启动一个实例,该实例将持续3天并开始事情!

出于本文的目的,当进入沙箱时,你将创建一个基本的、空缺的沙箱,像这样:

正如你在创建窗口中看到的那样,还有许多其他有用的沙箱,但是我们将选择这个选项,由于我们将用自己的数据填充数据库。
安歇几分钟,等待运行完成。
一旦完成,你将得到你的连接信息,如下所示:

这个窗口有一些你须要的东西。
首先,你将把稳到Bolt URL,并完成其端口号。

要通过Python建立连接,你将须要这个。
接下来,你还须要密码(在本例中为“difficulties-pushup-gap”)。
这将须要验证到此实例中。
我要指出的是,3天后当这个实例被删除时,这些信息就不再有效了。

连接到Neo4j并填充数据库

现在,我们须要在本地机器(或任何有Python代码的地方)和沙箱数据库之间建立连接。
这就须要用到BOLT URL和密码。

我已经创建了一个helper类来做这一点:

classNeo4jConnection:def__init__(self,uri,user,pwd):self.__uri=uriself.__user=userself.__pwd=pwdself.__driver=Nonetry:self.__driver=GraphDatabase.driver(self.__uri,auth=(self.__user,self.__pwd))exceptExceptionase:print("Failedtocreatethedriver:",e)defclose(self):ifself.__driverisnotNone:self.__driver.close()defquery(self,query,parameters=None,db=None):assertself.__driverisnotNone,"Drivernotinitialized!"session=Noneresponse=Nonetry:session=self.__driver.session(database=db)ifdbisnotNoneelseself.__driver.session()response=list(session.run(query,parameters))exceptExceptionase:print("Queryfailed:",e)finally:ifsessionisnotNone:session.close()returnresponseconn=Neo4jConnection(uri="bolt://52.87.205.91:7687",user="neo4j",pwd="difficulties-pushup-gaps")

现在我们可以开始填充数据库了。
我们首先在数据库中创建一些约束,以确保节点不重复,同时建立一些索引:

conn.query('CREATECONSTRAINTpapersIFNOTEXISTSON(p:Paper)ASSERTp.idISUNIQUE')conn.query('CREATECONSTRAINTauthorsIFNOTEXISTSON(a:Author)ASSERTa.nameISUNIQUE')conn.query('CREATECONSTRAINTcategoriesIFNOTEXISTSON(c:Category)ASSERTc.categoryISUNIQUE')

现在创建三个函数来为category和author节点创建数据框,我们将利用它们分别添补到数据库中:

defadd_categories(categories):#向Neo4j图中添加种别节点。
query='''UNWIND$rowsASrowMERGE(c:Category{category:row.category})RETURNcount()astotal'''returnconn.query(query,parameters={'rows':categories.to_dict('records')})defadd_authors(rows,batch_size=10000):##以批处理作业的形式将作者节点添加到Neo4j图中。
query='''UNWIND$rowsASrowMERGE(:Author{name:row.author})RETURNcount()astotal'''returninsert_data(query,rows,batch_size)definsert_data(query,rows,batch_size=10000):#以批处理办法更新Neo4j数据库。
total=0batch=0start=time.time()result=Nonewhilebatchbatch_size<len(rows):res=conn.query(query,parameters={'rows':rows[batchbatch_sizebatch+1)batch_size].to_dict('records')})total+=res[0]['total']batch+=1result={"total":total,"batches":batch,"time":time.time()-start}print(result)returnresult

这些函数将每一列放入变量$rows中,这些列是列表格式的。

UNWIND命令获取列表中的每个实体并将其添加到数据库中。
在此之后,我们利用一个赞助函数以批处理模式更新数据库,当你处理超过50k的上传时,它会很有帮助。

加载这些节点后,我们将添加论文节点以及与所有关系:

defadd_papers(rows,batch_size=5000):#添加论文节点,(:Author)--(:Paper),#(:Paper)--(:Category)关系query='''UNWIND$rowsasrowMERGE(p:Paper{id:row.id})ONCREATESETp.title=row.title//connectcategoriesWITHrow,pUNWINDrow.category_listAScategory_nameMATCH(c:Category{category:category_name})MERGE(p)-[:IN_CATEGORY]->(c)//connectauthorsWITHdistinctrow,p//reducecardinalityUNWINDrow.cleaned_authors_listASauthorMATCH(a:Author{name:author})MERGE(a)-[:AUTHORED]->(p)RETURNcount(distinctp)astotal'''returninsert_data(query,rows,batch_size)

因此,与category和author节点类似,我们创建了每一篇论文,然后通过数据帧中每一行的:authorated或:IN_CATEGORY关系将其连接起来。

请把稳,在这个函数中有更多的数据在管道中移动,因此它可能有助于减少批处理大小,以防止超时缺点。

同样,在这个步骤中,我们可能会在完全的数据帧上利用类似于explosion的方法,为每个列表的每个元素获取一行,并以这种办法将全体数据帧载入到数据库中。
这是可行的,这正是我们将不才面对少量数据所做的。

然而,对付更大的数据集,将数据加载到Neo4j并不是一种非常有效的方法。
由于Neo4j是一个事务性数据库,我们创建一个数据库,数据帧的每一行就实行一条语句,这会非常缓慢。
它也可能超出可用内存。
沙箱实例有大约500 MB的堆内存和500 MB的页面缓存。
因此,这进一步推动了以批处理办法更新数据库。

实行所有这些函数来添补图,我们有:

categories=pd.DataFrame(df[['category_list']])categories.rename(columns={'category_list':'category'},inplace=True)categories=categories.explode('category')\.drop_duplicates(subset=['category'])authors=pd.DataFrame(df[['cleaned_authors_list']])authors.rename(columns={'cleaned_authors_list':'author'},inplace=True)authors=authors.explode('author').drop_duplicates(subset=['author'])add_categories(categories)add_authors(authors)add_papers(df)

太棒了!我们现在有一个填充数据库!下面是该图的子样本,通过该命令运行得到:MATCH (a:Author)-[:AUTHORED]->(p:Paper)-[:IN_CATEGORY]->(c:Category) RETURN a, p, c LIMIT 300

确保它有我们想要的东西……

查询数据库以得到一些答案

一个提示:当你有了一个已添补的数据库时,你该当让Neo4j处理尽可能多的打算,然后再将答案带回Python(如果你须要的话)。

在本例中,假设我们想打算每个类别的干系度,并返回前20个类别的种别。
显然,我们可以在Python中完成这个大略的事情,但在Neo4j中完成它。

在某些时候,你可能须要进行更繁芜的打算(例如节点中央性、路径查找或社区检测),这些都可以并且该当在将结果***回Python之前在Neo4j中完成。

为了在Cypher中做到这一点,我们可以利用许多方法,但这里有一个快速有效的方法:

query_string='''MATCH(c:Category)RETURNc.category_name,SIZE(()-[:IN_CATEGORY]->(c))ASinDegreeORDERBYinDegreeDESCLIMIT20'''top_cat_df=pd.DataFrame([dict(_)for_inconn.query(query_string)])top_cat_df.head(20)

这该当返回:

上述数据子集的入度分布如下:

因此,这表明数据库已经添补,以及我们如何得到结果。
无论如何,另一种方法可以得到相同的结果返回的列脸色势是:

result=conn.query(query_string)forrecordinresult:print(record['c.category'],record['inDegree'])总结

我们已经展示了如何从Python连接到Neo4j沙箱,并在知足哀求的情形下上传数据。

就像编码中的其他事情一样,有很多不同的方法可以实现这一点,我们鼓励感兴趣的用户紧张利用Cypher而不是Python来探索上面的演示。

通过利用Neo4j Python连接器,可以很随意马虎地在Python和Neo4j数据库之间来回切换,就像其他数据库一样。
这将为数据科学和机器学习带来各种令人愉快的可能性,比如自动节点分类、链接预测和节点聚类。

感谢阅读!