木叶三忍的博客

web开发

Menu Close

mongo-connector实现MongoDB和Elasticsearch数据同步

环境要求

  • CentOS(内存至少512M)
  • curl
  • Java和JRE
  • Python
  • pip
  • MongoDB
  • Elasticsearch
  • mongo-connector
  • elastic2-doc-manager

上方大部分环境在我之前的博文中已经介绍完毕,可自行查看我之前的文章,这里只介绍剩下的三个环境配置:

  • python
  • python-pip
  • mongo-connector
  • elastic2-doc-manager

查看当前python状态

[root@lnmp soft]# yum info python
Loaded plugins: security
Installed Packages
Name        : python
Arch        : x86_64
Version     : 2.6.6
Release     : 52.el6
Size        : 78 k
Repo        : installed
# 省略...

可见我当前版本是python2.6.6,并且是已经被安装了。

查看当前python-pip状态:

[root@lnmp soft]# yum info python-pip
Loaded plugins: security
Installed Packages
Name        : python-pip
Arch        : noarch
Version     : 7.1.0
Release     : 1.el6
Size        : 6.6 M
Repo        : installed
# 省略...

同样python-pip在我的环境中也是被安装的,这里就不多说了。

使用pip安装mongo-connector

[root@lnmp soft]# pip install mongo-connector

我使用了aliyun的yum镜像,如果你没有配置额外的yum源,出现找不到可用包,可以像我一样,增加一个阿里云的yum源,查看:添加阿里云yum源方式

elastic2-doc-manager的安装

因为我的elasticsearch是5.0的版本,所以配合的我需要安装elastic2-doc-manager[elastic5]版本,具体版本对应关系,查看这里https://github.com/mongodb-labs/elastic2-doc-manager

安装elastic2-doc-manager[elastic5],我们可以使用pip安装:

[root@lnmp soft]# pip install elastic2-doc-manager[elastic5]

配置MongoDB

日志存储目录

[root@lnmp soft]# mkdir -pv /var/log/mongodb

数据目录

[root@lnmp soft]# mkdir -pv /data/db/mongodb

数据保存目录,mongodb默认是/data/db,请确保该目录存在,并且有相应权限。

创建mongodb配置文件

这里创建了/etc/mongodb.conf文件,放在哪里可以随意,因为我们将会将路径当作启动mongodb的选项而已,将如下内容写入到你的mongodb.conf文件中:

storage:
  dbPath: /data/db/mongodb
  journal:
    enabled: true
 
# 日志存储位置
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log
 
# 网络和端口
net:
  port: 27017
  bindIp: 127.0.0.1

# 复制配置
replication:
  replSetName: rs0
  oplogSizeMB: 100

启动mongodb

[root@lnmp soft]# mongod --config /etc/mongodb.conf 
[root@lnmp soft]# ps aux | grep mongod
root      2133  0.0  0.0   4420   772 pts/0    S+   12:48   0:00 grep mongod

发现并未成功启动,我们看看日志,打开/var/log/mongodb/mongod.log,信息如下:

old lock file: /data/db/mongodb/mongod.lock.  probably means unclean shutdown,
but there are no journal files to recover.
this is likely human error or filesystem corruption.
please make sure that your journal directory is mounted.

说明是我上次未正常关闭mongodb造成的,因为我习惯用kill -9去杀掉进程了,解决方案目前我是直接清空数据目录,也就是/data/db/mongodb,后面我会介绍mongodb的脚本启动方式和正常关闭方式,这里只是介绍数据同步,就不去测试了,我们这里先清空数据目录,执行:

[root@lnmp soft]# rm -rf /data/db/mongodb/*

再次执行:

[root@lnmp soft]# mongod --config /etc/mongodb.conf

这次启动正常,不过这个窗口也就不能编辑和关闭了,如果想继续使用窗口,可以后台运行,命令如下:

# 执行后敲两下回车,可看到标准输入又开始工作
[root@lnmp soft]# nohup mongod --config /etc/mongodb.conf &

测试你是否正常运行了mongod:

[root@lnmp soft]# ps aux | grep mongod
root      2136  2.9  5.2 442308 54088 pts/0    Sl+  12:54   0:00 mongod --storageEngine=mmapv1 --config /etc/mongodb.conf
500       2157  0.0  0.0   4420   768 pts/1    S+   12:55   0:00 grep mongod

我们也可以执行:

[root@lnmp soft]# netstat -tnlp | grep mongod
tcp        0      0 127.0.0.1:27017             0.0.0.0:*                   LISTEN      2136/mongod

以上两种都可以说明启动成功。

启动mongo客户端

运行:

[root@lnmp soft]# mongo

我们输入初始化命令:

rs.initiate()

噢!报错如下:

{
	"info2" : "no configuration specified. Using a default configuration for the set",
	"me" : "lnmp:27017",
	"ok" : 0,
	"errmsg" : "No host described in new configuration 1 for replica set rs0 maps to this node",
	"code" : 93
}

我们缺少一点东西而已,我们添加本机到members就行,在终端粘贴如下内容:

config = {
    _id : "rs0",
     members : [
         {_id : 0, host : "127.0.0.1:27017"}
     ]
}

然后,再次运行:

rs.initiate(config)

运行成功,如下:

> rs.initiate(config)
{ "ok" : 1 }

此时,命令提示符成为如下:

rs0:OTHER>

然后,运行:

rs0:OTHER> rs.status
function () {
    return db._adminCommand("replSetGetStatus");
}

发现,这次命令提示符如下:

rs0:PRIMARY>

因为只有一个成员,所以它被识别成为了主库,也就是Primary!

我们来往mongodb里面插入一些测试数据,数据库名和collection我就随意了,但是后面同步会再次用到,如果不想出错,就和我保持一样吧,嗯~

切换到数据库

use connectortest

创建collection

db.createCollection("syncthis");

循环插入一些数据,复制粘贴如下内容:

for (var i = 1; i <= 10; i++) {
   db.syncthis.insert( { value : i } )
}

查看我们插入的测试数据:

rs0:PRIMARY> db.syncthis.find()
{ "_id" : ObjectId("59b6c4ff796518572652aec7"), "value" : 1 }
{ "_id" : ObjectId("59b6c4ff796518572652aec8"), "value" : 2 }
{ "_id" : ObjectId("59b6c4ff796518572652aec9"), "value" : 3 }
{ "_id" : ObjectId("59b6c4ff796518572652aeca"), "value" : 4 }
{ "_id" : ObjectId("59b6c4ff796518572652aecb"), "value" : 5 }
{ "_id" : ObjectId("59b6c4ff796518572652aecc"), "value" : 6 }
{ "_id" : ObjectId("59b6c4ff796518572652aecd"), "value" : 7 }
{ "_id" : ObjectId("59b6c4ff796518572652aece"), "value" : 8 }
{ "_id" : ObjectId("59b6c4ff796518572652aecf"), "value" : 9 }
{ "_id" : ObjectId("59b6c4ff796518572652aed0"), "value" : 10 }

此mongo客户端窗口别关,我们再登录开启另一个窗口,用于接下来的步骤!

启动elasticsearch

新开的窗口请使用elasticsearch用户登录,上篇文章我专门强调过,这里再不作叙述!

启动mongo-connector

[root@lnmp soft]# mongo-connector -m localhost:27017 -t http://localhost:9200 -o /var/log/mongodb/mongo-connector.log -d elastic2_doc_manager -n connectortest.syncthis

-m:指定mongodb的host和port

-t:目标程序的host和port也就是我们此次实验的elasticsearch

-o:输出日志,这里我放到/var/log/mongodb下了,之前我们创建过这个目录了

-d:指定目标文档类型,这里我们使用的是elastic2_doc_manager

-n:被复制的数据库和collection名称

出现如下信息,并且窗口不可输入,即证明启动成功:

Logging to /root/mongo-connector.log.

OK!请再次打开登录一个新的窗口!

测试elasticsearch数据

[root@lnmp soft]# curl http://localhost:9200/connectortest/_search/?pretty=1

{
  "took" : 207,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec7",
        "_score" : 1.0,
        "_source" : {
          "value" : 1.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aeca",
        "_score" : 1.0,
        "_source" : {
          "value" : 4.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecb",
        "_score" : 1.0,
        "_source" : {
          "value" : 5.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec9",
        "_score" : 1.0,
        "_source" : {
          "value" : 3.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aed0",
        "_score" : 1.0,
        "_source" : {
          "value" : 10.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec8",
        "_score" : 1.0,
        "_source" : {
          "value" : 2.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecd",
        "_score" : 1.0,
        "_source" : {
          "value" : 7.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aece",
        "_score" : 1.0,
        "_source" : {
          "value" : 8.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecc",
        "_score" : 1.0,
        "_source" : {
          "value" : 6.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecf",
        "_score" : 1.0,
        "_source" : {
          "value" : 9.0
        }
      }
    ]
  }
}

可以发现,这就是我们插入的数据,不错,我们回到mongo客户端窗口,再插入一条内容:

rs0:PRIMARY> db.syncthis.insert( { value : 66666 } )
WriteResult({ "nInserted" : 1 })

好的,我们再回到最后开启的窗口,运行:

[root@lnmp soft]# curl localhost:9200/connectortest/_count/?pretty=1
{
  "count" : 11,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  }
}

数据成了11条了,好的,再次运行之前的命令:

[root@lnmp soft]# curl http://localhost:9200/connectortest/_search/?pretty=1

{
  "took" : 25,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 11,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec7",
        "_score" : 1.0,
        "_source" : {
          "value" : 1.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aeca",
        "_score" : 1.0,
        "_source" : {
          "value" : 4.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecb",
        "_score" : 1.0,
        "_source" : {
          "value" : 5.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec9",
        "_score" : 1.0,
        "_source" : {
          "value" : 3.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aed0",
        "_score" : 1.0,
        "_source" : {
          "value" : 10.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aec8",
        "_score" : 1.0,
        "_source" : {
          "value" : 2.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecd",
        "_score" : 1.0,
        "_source" : {
          "value" : 7.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aece",
        "_score" : 1.0,
        "_source" : {
          "value" : 8.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6ca3e796518572652aed1",
        "_score" : 1.0,
        "_source" : {
          "value" : 66666.0
        }
      },
      {
        "_index" : "connectortest",
        "_type" : "syncthis",
        "_id" : "59b6c4ff796518572652aecc",
        "_score" : 1.0,
        "_source" : {
          "value" : 6.0
        }
      }
    ]
  }
}

可以发现,我们最后插入的66666已经自动被同步了,到这里就结束了,有了同步环境,下次我们将介绍laravel中将日志写入mongodb,并且由于数据会自动同步到elasticsearch,我们可以做日志和bug的搜索和分析,听起来很不错,right? 我将会在不久后实现。

© 2019 木叶三忍的博客. All rights reserved.

Theme by Anders Norén.