GVKun编程网logo

与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差(类似mongodb的数据库)

10

在本文中,我们将详细介绍与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差的各个方面,并为您提供关于类似mongodb的数据库的相关解答,同时,我们也将为您带来关于A

在本文中,我们将详细介绍与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差的各个方面,并为您提供关于类似mongodb的数据库的相关解答,同时,我们也将为您带来关于Android Studio:使用Mongo Java驱动程序连接到MongoDB服务器、C#、Java驱动连接MongoDB以及封装(C#的MongoDBHelper,Java的MongoDBUtil)、Cassandra Java驱动程序的最佳设置只能写入本地数据中心、Cassandra Java驱动程序:有多少接触点合理?的有用知识。

本文目录一览:

与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差(类似mongodb的数据库)

与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差(类似mongodb的数据库)

我已经为MongoDB和Cassandra构建了一个导入器。基本上,导入程序的所有操作都是相同的,除了最后一部分中,数据的形成与所需的cassandra表架构和所需的mongodb文档结构相匹配。与MongoDB相比,Cassandra的写入性能确实很差,我想我做错了。

基本上,我的抽象导入程序类加载数据,读出所有数据,并将其传递给扩展的MongoDBImporter或CassandraImporter类,以将数据发送到数据库。一次针对一个数据库-
不能同时向C *和MongoDB插入“双”。导入程序在同一台计算机上针对相同数量的节点(6)运行。


问题:

57分钟后,MongoDB导入完成。我摄取了10.000.000个文档,并且期望Cassandra的行数大致相同。我的Cassandra导入程序自2.5小时以来一直在运行,并且仅在插入的5.000.000行中运行。我将在这里等待进口商完成并编辑实际的完成时间。


如何使用Cassandra导入:

我准备两个语句 一旦 摄取数据之前。这两个语句都是 UPDATE查询,
因为有时我必须将数据追加到现有列表中。开始导入之前,我的表已完全清除。准备好的语句会一遍又一遍地使用。

PreparedStatement statementA = session.prepare(queryA);PreparedStatement statementB = session.prepare(queryB);

对于 每一 行,我创建一个BoundStatement并将该语句传递给我的“自定义”批处理方法:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B    bs = bs.bind();    //add data... with several bs.setXXX(..) calls    cassandraConnection.executeBatch(bs);

使用MongoDB,我一次可以插入1000个文档(即最大数量)。对于Cassandra,com.datastax.driver.core.exceptions.InvalidQueryException:Batch toolarge在某个时候,导入程序仅崩溃了我的10条语句。我正在使用此代码来构建批次。顺便说一句,我之前以1000、500、300、200、100、50、20批处理开始,但显然它们也不起作用。然后将其设置为10,然后再次引发异常。现在我不知道为什么会中断。

private static final int MAX_BATCH_SIZE = 10;private Session session;private BatchStatement currentBatch;...@Overridepublic ResultSet executeBatch(Statement statement) {    if (session == null) {        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);    }    if (currentBatch == null) {        currentBatch = new BatchStatement(Type.UNLOGGED);    }    currentBatch.add(statement);    if (currentBatch.size() == MAX_BATCH_SIZE) {        ResultSet result = session.execute(currentBatch);        currentBatch = new BatchStatement(Type.UNLOGGED);        return result;    }    return null;}

我的C *模式如下所示

CREATE TYPE stream.event (    data_dbl frozen<map<text, double>>,    data_str frozen<map<text, text>>,    data_bool frozen<map<text, boolean>>,);CREATE TABLE stream.data (    log_creator text,    date text, //date of the timestamp    ts timestamp,    log_id text, //some id    hour int, //just the hour of the timestmap    x double,    y double,    events list<frozen<event>>,    PRIMARY KEY ((log_creator, date, hour), ts, log_id)) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

有时我需要向现有行中添加其他新事件。这就是为什么我需要一个UDT列表的原因。我的UDT包含三个映射,因为事件创建者会产生不同的数据(字符串/双精度/布尔型的键/值对)。我知道以下事实:UDT已冻结,并且我无法触摸已摄取事件的地图。对我来说很好,我只需要添加有时具有相同时间戳的新事件。我对日志的创建者(一些传感器名称),记录的日期(即“
22-09-2016”)和时间戳的小时数进行分区(以在分配更多数据的同时将相关数据保持在一起)一个分区)。


我在Pom中使用Cassandra 3.0.8和Datastax Java
Driver,版本3.1.0。根据什么是卡桑德拉的批次限制?,我不应该通过调整增加批量大小batch_size_fail_threshold_in_kb在我的cassandra.yaml。那么…我的导入有什么问题或出了什么问题?


更新
因此,我已经调整了代码以运行异步查询并将当前正在运行的插入存储在列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前在插入中发生错误时,该方法将等待500ms直到插入低于阈值。现在,当没有插入失败时,我的代码会自动提高阈值。

但是,在流处理330万行之后,处理了280.000个插入,但是没有发生错误。
当前正在处理的插入数似乎太高。6个cassandra节点在2岁的商用硬件上运行。

这是否是大量并发插入(6个节点为280.000)的问题?我应该添加像这样的变量MAX_CONCURRENT_INSERT_LIMIT吗?

private List<ResultSetFuture> runningInsertList;private static int concurrentInsertLimit = 1000;private static int concurrentInsertSleepTime = 500;...@Overridepublic void executeBatch(Statement statement) throws InterruptedException {    if (this.runningInsertList == null) {        this.runningInsertList = new ArrayList<>();    }    //Sleep while the currently processing number of inserts is too high    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {        Thread.sleep(concurrentInsertSleepTime);    }    ResultSetFuture future = this.executeAsync(statement);    this.runningInsertList.add(future);    Futures.addCallback(future, new FutureCallback<ResultSet>() {        @Override        public void onSuccess(ResultSet result) {            runningInsertList.remove(future);        }        @Override        public void onFailure(Throwable t) {            concurrentInsertErrorOccured = true;        }    }, MoreExecutors.sameThreadExecutor());    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {        concurrentInsertLimit += 2000;        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));    }    return;}

答案1

小编典典

在使用了C 一段时间之后,我相信您应该只将批处理用于保持多个表同步。如果您不需要该 功能 ,则完全不要使用批处理,因为这 * 导致性能下降。

将数据加载到C *的正确方法是异步写入,如果集群无法跟上接收速度,则可以使用可选的反压。您应该用以下方法替换“自定义”批处理方法:

  • 执行异步写入
  • 控制下有多少机上写
  • 写超时时执行一些重试。

要执行异步写入,请使用.executeAsync方法,该方法将返回一个ResultSetFuture对象。

为了控制下,有多少个运行中查询只是ResultSetFuture将从.executeAsync方法中检索到的对象收集在一个列表中,并且如果列表得到了(此处的计算值),则说1k个元素,然后等待所有这些元素完成后再发出更多写操作。或者,您可以等待第一个完成后再发出更多写操作,以保持列表完整。

最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:

  1. 用相同的超时值再次写入
  2. 以增加的超时值再次写入
  3. 等待一段时间,然后使用相同的超时值再次写入
  4. 等待一段时间,然后以增加的超时值再次写入

从1到4,背压 强度 增加。选择最适合您的情况的一种。


问题更新后编辑

您的插入逻辑对我来说似乎有点混乱:

  1. 我看不到任何 重试 逻辑
  2. 如果失败,则不要删除列表中的项目
  3. while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)是错误的,因为仅当发出的查询数>时您才会进入睡眠状态concurrentInsertLimit,并且由于2.您的线程将仅停留在该位置。
  4. 你永远不会设置为假 concurrentInsertErrorOccured

我通常会保留(失败的)查询列表,以便稍后重试。这使我可以对查询进行有力的控制,并且当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多X次,然后出现严重失败…)。

该列表应该非常动态,例如,当查询失败时,您可以在其中添加项目,而在执行重试时,则可以删除项目。现在,您可以了解群集的限制,并concurrentInsertLimit根据例如最近一秒内失败查询的平均数量进行调整,或者使用更简单的方法“
如果重试列表中有项目则暂停 ”等。


注释后编辑2

由于您不需要任何重试逻辑,因此我将以这种方式更改代码:

private List<ResultSetFuture> runningInsertList;private static int concurrentInsertLimit = 1000;private static int concurrentInsertSleepTime = 500;...@Overridepublic void executeBatch(Statement statement) throws InterruptedException {    if (this.runningInsertList == null) {        this.runningInsertList = new ArrayList<>();    }    ResultSetFuture future = this.executeAsync(statement);    this.runningInsertList.add(future);    Futures.addCallback(future, new FutureCallback<ResultSet>() {        @Override        public void onSuccess(ResultSet result) {            runningInsertList.remove(future);        }        @Override        public void onFailure(Throwable t) {            runningInsertList.remove(future);            concurrentInsertErrorOccured = true;        }    }, MoreExecutors.sameThreadExecutor());    //Sleep while the currently processing number of inserts is too high    while (runningInsertList.size() >= concurrentInsertLimit) {        Thread.sleep(concurrentInsertSleepTime);    }    if (!concurrentInsertErrorOccured) {        // Increase your ingestion rate if no query failed so far        concurrentInsertLimit += 10;    } else {        // Decrease your ingestion rate because at least one query failed        concurrentInsertErrorOccured = false;        concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);        while (runningInsertList.size() >= concurrentInsertLimit) {            Thread.sleep(concurrentInsertSleepTime);        }    }    return;}

您还可以通过用List<ResultSetFuture>计数器代替来优化过程。

希望能有所帮助。

Android Studio:使用Mongo Java驱动程序连接到MongoDB服务器

Android Studio:使用Mongo Java驱动程序连接到MongoDB服务器

关于此问题的文章很多,但是似乎没人能解决,因此maby的某些情况已经改变。

我正在尝试将我的Android应用程序连接到位于mLab上的MongoDB服务器。我正在使用Mongo Java
Drived,当然已经将该库添加到android studio中。

可以启动应用程序,但是当我单击注册按钮时,应用程序崩溃了。

这是我的代码:

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;

public class MainActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    final EditText username = (EditText)findViewById(R.id.username);
    final Button bRegister = (Button) findViewById(R.id.bRegister);

    bRegister.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View v) {
            String stringUsername = username.toString();
            try {
                addToDatabase(stringUsername);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    });
}

private static void addToDatabase(String username){
    MongoClientURI uri  = new MongoClientURI("mongodb:///*mLab database URL */");
    MongoClient client = new MongoClient(uri);
    MongoDatabase db = client.getDatabase(uri.getDatabase());
    MongoCollection<Document> coll = db.getCollection("newDB");

    Document doc = new Document("username",username);
    coll.insertOne(doc);
    client.close();
}
}

我在清单文件中也有permision.INTERNET。

任何帮助将不胜感激!

//编辑Stacktrace:03/10 02:43:09: Launching app Cold swapped changes. $ adb shell am start -n "com.newhdc.pedergb.mongodb_servertester/com.newhdc.pedergb.mongodb_servertester.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER Client not ready yet..Waiting for process to come online Connected to process 4752 on device emulator-5554 D/NetworkSecurityConfig: No Network Security Config specified,using platform default W/org.bson.ObjectId: Failed to get process identifier from JMX,using random number instead java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory; at org.bson.types.ObjectId.createProcessIdentifier(ObjectId.java:533) at org.bson.types.ObjectId.<clinit>(ObjectId.java:491) at com.mongodb.connection.ClusterId.<init>(ClusterId.java:47) at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:105) at com.mongodb.Mongo.createCluster(Mongo.java:744) at com.mongodb.Mongo.createCluster(Mongo.java:728) at com.mongodb.Mongo.createCluster(Mongo.java:702) at com.mongodb.Mongo.<init>(Mongo.java:310) at com.mongodb.Mongo.<init>(Mongo.java:306) at com.mongodb.MongoClient.<init>(MongoClient.java:284) at com.newhdc.pedergb.mongodb_servertester.MainActivity.addToDatabase(MainActivity.java:39) at com.newhdc.pedergb.mongodb_servertester.MainActivity.access$000(MainActivity.java:14) at com.newhdc.pedergb.mongodb_servertester.MainActivity$1.onClick(MainActivity.java:29) at android.view.View.performClick(View.java:5637) at android.view.View$PerformClick.run(View.java:22429) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) Caused by: java.lang.ClassNotFoundException: Didn't find class "java.lang.management.ManagementFactory" on path: DexPathList[[dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-support- annotations-25.1.0_11ac1b6ae4b8623fca16868c12f685674e962f99-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_9-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_8-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_7-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_6-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_5-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_4-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_3-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_2-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_1-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-slice_0-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-mongodb-driver- core-3.4.2_cf1ecbf321a58b8bf97e118b2c0ff7614ac982a5-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-mongodb-driver-3.4.2_cfefe7ed281d321e57736b38e1e68fc6160680ac- classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-com.android.support-support-vector- drawable-25.1.0_3dbe341ffa762dac2cc1137bc6aae1731f3bc1c0-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-com.android.support- support-v4-25.1.0_c534a46cb17b55c593319a94e0d90e0b75103a24-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-com.android.support-support-media- compat-25.1.0_b58e3876df91b49420cb0766dd6edfdbff0dedbc-classes.dex",dex file "/data/data/com.newhdc.pedergb.mongodb_servertester/files/instant- run/dex/slice-com.android.support-support- fragment-25.1.0_d616629f11d994c207dfc4b5d01648e3194bccbc-classes.dex",dex f I/cluster: Cluster created with settings {hosts=[ds123080.mlab.com:23080],mode=SINGLE,requiredClusterType=UNKNOWN,serverSelectionTimeout='30000 ms',maxWaitQueueSize=500} D/AndroidRuntime: Shutting down VM E/AndroidRuntime: **FATAL EXCEPTION**: main Process: com.newhdc.pedergb.mongodb_servertester,PID: 4752 java.lang.ExceptionInInitializerError at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) at com.mongodb.connection.SingleServerCluster.<init>(SingleServerCluster.java:54) at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:114) at com.mongodb.Mongo.createCluster(Mongo.java:744) at com.mongodb.Mongo.createCluster(Mongo.java:728) at com.mongodb.Mongo.createCluster(Mongo.java:702) at com.mongodb.Mongo.<init>(Mongo.java:310) at com.mongodb.Mongo.<init>(Mongo.java:306) at com.mongodb.MongoClient.<init>(MongoClient.java:284) at com.newhdc.pedergb.mongodb_servertester.MainActivity.addToDatabase(MainActivity.java:39) at com.newhdc.pedergb.mongodb_servertester.MainActivity.access$000(MainActivity.java:14) at com.newhdc.pedergb.mongodb_servertester.MainActivity$1.onClick(MainActivity.java:29) at android.view.View.performClick(View.java:5637) at android.view.View$PerformClick.run(View.java:22429) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) Caused by: java.lang.NullPointerException: Attempt to invoke virtual method 'java.security.CodeSource java.security.ProtectionDomain.getCodeSource()' on a null object reference at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:111) at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) at com.mongodb.connection.SingleServerCluster.<init>(SingleServerCluster.java:54) at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:114) at com.mongodb.Mongo.createCluster(Mongo.java:744) at com.mongodb.Mongo.createCluster(Mongo.java:728) at com.mongodb.Mongo.createCluster(Mongo.java:702) at com.mongodb.Mongo.<init>(Mongo.java:310) at com.mongodb.Mongo.<init>(Mongo.java:306) at com.mongodb.MongoClient.<init>(MongoClient.java:284) at com.newhdc.pedergb.mongodb_servertester.MainActivity.addToDatabase(MainActivity.java:39) at com.newhdc.pedergb.mongodb_servertester.MainActivity.access$000(MainActivity.java:14) at com.newhdc.pedergb.mongodb_servertester.MainActivity$1.onClick(MainActivity.java:29) at android.view.View.performClick(View.java:5637) at android.view.View$PerformClick.run(View.java:22429) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) Application terminated.

C#、Java驱动连接MongoDB以及封装(C#的MongoDBHelper,Java的MongoDBUtil)

C#、Java驱动连接MongoDB以及封装(C#的MongoDBHelper,Java的MongoDBUtil)

一.C#驱动连接MongoDB

1.创建项目

执行命令:dotnet new console -n MongoDbDriverDemo

 

 

 2.添加依赖包

执行命令:dotnet  add package MongoDB.Driver --version 2.10.2

 

 

 3.测试

using System;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;

namespace MongoDBDriverDemo
{
    class Program
    {

        async static System.Threading.Tasks.Task Main(string[] args)
        {
            try
            {
                MongoClient client = new MongoClient("mongodb://localhost:27017");

                var database = client.GetDatabase("foo");

                var collection = database.GetCollection<BsonDocument>("bar");

                Console.WriteLine("-----------------添加文档--------------------");
                {
                    var document = new BsonDocument
                               {
                                   { "name", "MongoDB" },
                                   { "type", "Database" },
                                   { "count", 1 },
                                   { "info", new BsonDocument
                                       {
                                           { "x", 203 },
                                           { "y", 102 }
                                       }}
                               };

                    await collection.InsertOneAsync(document);//异步
                    //collection.InsertOne(document);

                    var documents = Enumerable.Range(0, 100).Select(i => new BsonDocument("i", i));
                    //collection.InsertMany(documents);
                    await collection.InsertManyAsync(documents);
                }
                Console.WriteLine("------------------统计文档--------------------");
                {
                                 
                    var count = collection.CountDocuments(new BsonDocument());
                    var asyncCount = await collection.CountDocumentsAsync(new BsonDocument());
                    Console.WriteLine(count);
                    Console.WriteLine(asyncCount);
                }
                Console.WriteLine("-----------------查询文档--------------------");
                {
                    Console.WriteLine("------------------查询一个--------------------");
                    { 
                        var document = collection.Find(new BsonDocument()).FirstOrDefault();
                        Console.WriteLine(document.ToString());

                        var asyncDocument = await collection.Find(new BsonDocument()).FirstOrDefaultAsync();
                        Console.WriteLine(asyncDocument.ToString());
                    }
                    Console.WriteLine("------------------查询多个--------------------");
                    {
                        var documentList = collection.Find(new BsonDocument()).ToList();
                        documentList.ForEach(d => Console.WriteLine(d.ToString()));

                        var asyncDocumentList =await collection.Find(new BsonDocument()).ToListAsync();
                        await collection.Find(new BsonDocument()).ForEachAsync(d => Console.WriteLine(d));
                    }
                    {
                        var cursor = collection.Find(new BsonDocument()).ToCursor();
                        foreach (var document in cursor.ToEnumerable())
                        {
                            Console.WriteLine(document);
                        }
                    }
                }
                {
                    {
                        var filter = Builders<BsonDocument>.Filter.Eq("i", 71);
                        {
                            var document = collection.Find(filter).First();
                            Console.WriteLine(document);
                        }
                        {
                            var document = await collection.Find(filter).FirstAsync();
                            Console.WriteLine(document);
                        }
                    }
                    Console.WriteLine("------------------过滤文档--------------------");
                    {
                        var filter = Builders<BsonDocument>.Filter.Gt("i", 50);
                        {
                            var cursor = collection.Find(filter).ToCursor();
                            foreach (var document in cursor.ToEnumerable())
                            {
                                Console.WriteLine(document);
                            }
                        }
                        {
                            await collection.Find(filter).ForEachAsync(document => Console.WriteLine(document));
                        }
                    }
                    {
                        var filterBuilder = Builders<BsonDocument>.Filter;
                        var filter = filterBuilder.Gt("i", 50) & filterBuilder.Lte("i", 100);
                        {
                            var cursor = collection.Find(filter).ToCursor();
                            foreach (var document in cursor.ToEnumerable())
                            {
                                Console.WriteLine(document);
                            }
                        }
                        {
                            await collection.Find(filter).ForEachAsync(document => Console.WriteLine(document));
                        }
                    }
                }
                Console.WriteLine("------------------排序文档--------------------");
                {
                    var filter = Builders<BsonDocument>.Filter.Exists("i");
                    var sort = Builders<BsonDocument>.Sort.Descending("i");
                    {
                        var document = collection.Find(filter).Sort(sort).First();
                    }

                    {
                        var document = await collection.Find(filter).Sort(sort).FirstAsync();
                    }
                }
                Console.WriteLine("------------------过滤文档--------------------");
                {
                    var projection = Builders<BsonDocument>.Projection.Exclude("_id");
                    {
                        var document = collection.Find(new BsonDocument()).Project(projection).First();
                        Console.WriteLine(document.ToString());
                    }
                    {
                        var document = await collection.Find(new BsonDocument()).Project(projection).FirstAsync();
                        Console.WriteLine(document);
                    }
                }
                Console.WriteLine("------------------更新文档--------------------");
                {
                    Console.WriteLine("------------------更新一个--------------------");
                    {
                        var filter = Builders<BsonDocument>.Filter.Eq("i", 10);
                        var update = Builders<BsonDocument>.Update.Set("i", 10);
                        {
                            collection.UpdateOne(filter, update);
                        }
                        {
                            await collection.UpdateOneAsync(filter, update);
                        }
                    }
                    Console.WriteLine("------------------更新多个--------------------");
                    {
                        var filter = Builders<BsonDocument>.Filter.Lt("i", 100);
                        var update = Builders<BsonDocument>.Update.Inc("i", 100);
                        {
                            var result = collection.UpdateMany(filter, update);
                            if (result.IsModifiedCountAvailable) Console.WriteLine(result.ModifiedCount);
                        }
                        {
                            var result = await collection.UpdateManyAsync(filter, update);
                            if (result.IsModifiedCountAvailable) Console.WriteLine(result.ModifiedCount);
                        }
                    }
                }
                Console.WriteLine("------------------刪除文档--------------------");
                {
                    Console.WriteLine("------------------刪除单个--------------------");
                    {
                        var filter = Builders<BsonDocument>.Filter.Eq("i", 110);
                        {
                            collection.DeleteOne(filter);
                        }
                        {
                            await collection.DeleteOneAsync(filter);
                        }
                    }
                    Console.WriteLine("------------------删除多个--------------------");
                    {
                        var filter = Builders<BsonDocument>.Filter.Gte("i", 100);
                        {
                            var result = collection.DeleteMany(filter);
                            Console.WriteLine(result.DeletedCount);
                        }
                        {
                            var result = await collection.DeleteManyAsync(filter);
                            Console.WriteLine(result.DeletedCount);
                        }
                    }
                }
              
                Console.WriteLine("------------------大量写入--------------------");
                {
                    var models = new WriteModel<BsonDocument>[]
                                 {
                                     new InsertOneModel<BsonDocument>(new BsonDocument("_id", 4)),
                                     new InsertOneModel<BsonDocument>(new BsonDocument("_id", 5)),
                                     new InsertOneModel<BsonDocument>(new BsonDocument("_id", 6)),
                                     new UpdateOneModel<BsonDocument>(
                                         new BsonDocument("_id", 1),
                                         new BsonDocument("$set", new BsonDocument("x", 2))),
                                     new DeleteOneModel<BsonDocument>(new BsonDocument("_id", 3)),
                                     new ReplaceOneModel<BsonDocument>(
                                         new BsonDocument("_id", 3),
                                         new BsonDocument("_id", 3).Add("x", 4))
                                 };
                    {
                        {
                            Console.WriteLine("-------------有序批量操作-保证操作顺序-------------------");
                            collection.BulkWrite(models);
                        }
                        /*
                        {
                            Console.WriteLine("-------------无序批量操作-不保证操作顺序-------------------");
                            collection.BulkWrite(models, new BulkWriteOptions { IsOrdered = false });
                        }*/
                    }
                   /* {
                        Console.WriteLine("-------------有序批量操作-保证操作顺序-------------------");
                        await collection.BulkWriteAsync(models);

                        Console.WriteLine("-------------无序批量操作-不保证操作顺序-------------------");
                        await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
                    }*/
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
                         
            Console.ReadKey();            
        }
    }
    
}

4.结果

 

C#封装MongoDB

using MongoDB.Driver;
using MongoDB.Bson;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections;
using System.Linq.Expressions;

namespace MongoDBDriverDemo
{
    public class MongoDBHelper
    {
        private readonly string mongoDBConnString = null;
        private readonly string databaseName = null;
        private IMongoDatabase database = null;
        private readonly bool autoCreateDb = false;
        private readonly bool autoCreateCollection = false;

        static MongoDBHelper()
        {
            BsonDefaults.GuidRepresentation = GuidRepresentation.Standard;
        }

        public MongoDBHelper(string mongoDBConnString, string databaseName, bool autoCreateDb = false, bool autoCreateCollection = false)
        {
            this.mongoDBConnString = mongoDBConnString;
            this.databaseName = databaseName;
            this.autoCreateDb = autoCreateDb;
            this.autoCreateCollection = autoCreateCollection;
        }

        private MongoClient CreateMongoClient()
        {
            return new MongoClient(mongoDBConnString);
        }

        private IMongoDatabase GetMongoDatabase()
        {
            if (database == null)
            {
                MongoClient client = this.CreateMongoClient();
                if (!this.DatabaseExists(client, databaseName) && !autoCreateDb)
                {
                    throw new KeyNotFoundException("此MongoDB名称不存在:" + databaseName);
                }
            }
            database = CreateMongoClient().GetDatabase(databaseName);
            return database;
        }

        private bool DatabaseExists(MongoClient client, string databaseName)
        {
            try
            {
                var databaseNames = client.ListDatabases().ToList().Select(db => db.GetValue("name").AsString);
                return databaseNames.Contains(databaseName);
            }
            catch
            {
                return true;
            }
        }

        private bool CollectionExists(IMongoDatabase database, string collectionName)
        {
            var options = new ListCollectionsOptions
            {
                Filter = Builders<BsonDocument>.Filter.Eq("name", collectionName)
            };

            return database.ListCollections(options).ToEnumerable().Any();
        }

        private IMongoCollection<TDoc> GetMongoCollection<TDoc>(string name, MongoCollectionSettings settings = null)
        {
            IMongoDatabase mongoDatabase = GetMongoDatabase();
            if (!this.CollectionExists(mongoDatabase, name) && !autoCreateCollection)
            {
                throw new KeyNotFoundException("此Collection名称不存在:" + name);
            }

            return mongoDatabase.GetCollection<TDoc>(name, settings);
        }

        private List<UpdateDefinition<TDoc>> BuildUpdateDefinition<TDoc>(object doc, string parent)
        {
            var updateList = new List<UpdateDefinition<TDoc>>();
            foreach (var property in typeof(TDoc).GetProperties(System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.Public))
            {
                var key = parent == null ? property.Name : $"{parent}.{property.Name}";
                if ((property.PropertyType.IsClass || property.PropertyType.IsInterface) && property.PropertyType != typeof(string) && property.GetValue(doc) != null)
                {
                    if (typeof(IList).IsAssignableFrom(property.PropertyType))
                    {
                        int i = 0;
                        var subObj = property.GetValue(doc);
                        foreach (var item in subObj as IList)
                        {
                            if (item.GetType().IsClass || item.GetType().IsInterface)
                            {
                                updateList.AddRange(BuildUpdateDefinition<TDoc>(doc, $"{key}.{i}"));
                            }
                            else
                            {
                                updateList.Add(Builders<TDoc>.Update.Set($"{key}.{i}", item));
                            }
                            i++;
                        }
                    }
                    else
                    {
                        var subObj = property.GetValue(doc);
                        foreach (var sub in property.PropertyType.GetProperties(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance))
                        {
                            updateList.Add(Builders<TDoc>.Update.Set($"{key}.{sub.Name}", sub.GetValue(subObj)));
                        }
                    }
                }
                else
                {
                    updateList.Add(Builders<TDoc>.Update.Set(key, property.GetValue(doc)));
                }

            }
            return updateList;
        }


        private void CreateIndex<TDoc>(IMongoCollection<TDoc> collection, string[] indexFields, CreateOneIndexOptions options = null)
        {
            if (indexFields == null) return;
            var indexKeys = Builders<TDoc>.IndexKeys;
            IndexKeysDefinition<TDoc> keys = null;
            if (indexFields.Length > 0)
                keys = indexKeys.Descending(indexFields[0]);
            for (int i = 1; i < indexFields.Length; i++)
            {
                var strIndex = indexFields[i];
                keys = keys.Descending(strIndex);
            }
            if (keys != null)
                collection.Indexes.CreateOne(new CreateIndexModel<TDoc>(keys), options);
        }


        public void CreateCollectionIndex<TDoc>(string collectionName, string[] indexFields, CreateOneIndexOptions options = null)
            => this.CreateIndex(GetMongoCollection<TDoc>(collectionName), indexFields, options);


        public void CreateCollection<TDoc>(string[] indexFields = null, CreateOneIndexOptions options = null)
            => this.CreateCollection<TDoc>(typeof(TDoc).Name, indexFields, options);

        public void CreateCollection<TDoc>(string collectionName, string[] indexFields = null, CreateOneIndexOptions options = null)
        {
            var mongoDatabase = this.GetMongoDatabase();
            mongoDatabase.CreateCollection(collectionName);
            CreateIndex(this.GetMongoCollection<TDoc>(collectionName), indexFields, options);
        }

        public List<TDoc> Find<TDoc>(Expression<Func<TDoc, bool>> filter, FindOptions options = null)
            => Find<TDoc>(typeof(TDoc).Name, filter, options);

        public List<TDoc> Find<TDoc>(string collectionName, Expression<Func<TDoc, bool>> filter, FindOptions options = null)
            => this.GetMongoCollection<TDoc>(collectionName).Find(filter, options).ToList();

        public List<TDoc> FindByPage<TDoc, TResult>(Expression<Func<TDoc, bool>> filter, Expression<Func<TDoc, TResult>> keySelector, int pageIndex, int pageSize, out int rsCount)
        {
            string collectionName = typeof(TDoc).Name;
            return FindByPage<TDoc, TResult>(collectionName, filter, keySelector, pageIndex, pageSize, out rsCount);
        }

        public List<TDoc> FindByPage<TDoc, TResult>(string collectionName, Expression<Func<TDoc, bool>> filter, Expression<Func<TDoc, TResult>> keySelector, int pageIndex, int pageSize, out int rsCount)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            rsCount = colleciton.AsQueryable().Where(filter).Count();

            int pageCount = rsCount / pageSize + ((rsCount % pageSize) > 0 ? 1 : 0);
            if (pageIndex > pageCount) pageIndex = pageCount;
            if (pageIndex <= 0) pageIndex = 1;            
            return colleciton.AsQueryable(new AggregateOptions { AllowDiskUse = true }).Where(filter).OrderBy(keySelector).Skip((pageIndex - 1) * pageSize).Take(pageSize).ToList();
        }

        public void Insert<TDoc>(TDoc doc, InsertOneOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            Insert<TDoc>(collectionName, doc, options);
        }

        public void Insert<TDoc>(string collectionName, TDoc doc, InsertOneOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            colleciton.InsertOne(doc, options);
        }


        public void InsertMany<TDoc>(IEnumerable<TDoc> docs, InsertManyOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            InsertMany<TDoc>(collectionName, docs, options);
        }

        public void InsertMany<TDoc>(string collectionName, IEnumerable<TDoc> docs, InsertManyOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            colleciton.InsertMany(docs, options);
        }

        public void Update<TDoc>(TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            List<UpdateDefinition<TDoc>> updateList = BuildUpdateDefinition<TDoc>(doc, null);
            colleciton.UpdateOne(filter, Builders<TDoc>.Update.Combine(updateList), options);
        }

        public void Update<TDoc>(string collectionName, TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            List<UpdateDefinition<TDoc>> updateList = BuildUpdateDefinition<TDoc>(doc, null);
            colleciton.UpdateOne(filter, Builders<TDoc>.Update.Combine(updateList), options);
        }


        public void Update<TDoc>(TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateDefinition<TDoc> updateFields, UpdateOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            Update<TDoc>(collectionName, doc, filter, updateFields, options);
        }

        public void Update<TDoc>(string collectionName, TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateDefinition<TDoc> updateFields, UpdateOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            colleciton.UpdateOne(filter, updateFields, options);
        }


        public void UpdateMany<TDoc>(TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            UpdateMany<TDoc>(collectionName, doc, filter, options);
        }


        public void UpdateMany<TDoc>(string collectionName, TDoc doc, Expression<Func<TDoc, bool>> filter, UpdateOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            List<UpdateDefinition<TDoc>> updateList = BuildUpdateDefinition<TDoc>(doc, null);
            colleciton.UpdateMany(filter, Builders<TDoc>.Update.Combine(updateList), options);
        }


        public void Delete<TDoc>(Expression<Func<TDoc, bool>> filter, DeleteOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            Delete<TDoc>(collectionName, filter, options);
        }

        public void Delete<TDoc>(string collectionName, Expression<Func<TDoc, bool>> filter, DeleteOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            DeleteResult deleteResult= colleciton.DeleteOne(filter, options);
            Console.WriteLine(deleteResult.DeletedCount);
        }


        public void DeleteMany<TDoc>(Expression<Func<TDoc, bool>> filter, DeleteOptions options = null)
        {
            string collectionName = typeof(TDoc).Name;
            DeleteMany<TDoc>(collectionName, filter, options);
        }


        public void DeleteMany<TDoc>(string collectionName, Expression<Func<TDoc, bool>> filter, DeleteOptions options = null)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            colleciton.DeleteMany(filter, options);
        }

        public void ClearCollection<TDoc>(string collectionName)
        {
            var colleciton = GetMongoCollection<TDoc>(collectionName);
            var inddexs = colleciton.Indexes.List();
            List<IEnumerable<BsonDocument>> docIndexs = new List<IEnumerable<BsonDocument>>();
            while (inddexs.MoveNext())
            {
                docIndexs.Add(inddexs.Current);
            }
            var mongoDatabase = GetMongoDatabase();
            mongoDatabase.DropCollection(collectionName);

            if (!CollectionExists(mongoDatabase, collectionName))
            {
                CreateCollection<TDoc>(collectionName);
            }

            if (docIndexs.Count > 0)
            {
                colleciton = mongoDatabase.GetCollection<TDoc>(collectionName);
                foreach (var index in docIndexs)
                {
                    foreach (IndexKeysDefinition<TDoc> indexItem in index)
                    {
                        try
                        {
                            colleciton.Indexes.CreateOne(new CreateIndexModel<TDoc>(indexItem));
                        }
                        catch
                        { }
                    }
                }
            }

        }

    }
}

 

 测试

var mongoDbHelper = new MongoDBHelper("mongodb://127.0.0.1:27017", "LogDB",true,true);

            mongoDbHelper.CreateCollection<SysLogInfo>("SysLog1", new[] { "LogDT" });

            mongoDbHelper.Insert<SysLogInfo>("SysLog1", new SysLogInfo { LogDT = DateTime.Now, Level = "Info", Msg = "测试消息" });

            mongoDbHelper.Find<SysLogInfo>("SysLog1", t => t.Level == "Info")
                .ForEach(doc => Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(doc)));

            System.Collections.Generic.List<SysLogInfo> list = new System.Collections.Generic.List<SysLogInfo>();

            for (int i = 0; i < 100; i++)
            {
                list.Add(new SysLogInfo(i, DateTime.Now, "Info", "你好"));
            }


            mongoDbHelper.InsertMany<SysLogInfo>("SysLog1", list);

            int rsCount = 0;
            var result = mongoDbHelper.FindByPage<SysLogInfo, Object>("SysLog1", t => t.Level == "Info", t => t._id, 1, 20, out rsCount);

            result.ForEach(doc => Console.WriteLine(System.Text.Json.JsonSerializer.Serialize(doc)));

            mongoDbHelper.Update<SysLogInfo>("SysLog1", new SysLogInfo { _id = "Code", LogDT = DateTime.Now, Level = "Error", Msg = "测试消息2" }, t => t.LogDT >= new DateTime(1900, 1, 1));


            mongoDbHelper.Delete<SysLogInfo>("SysLog1", t => t.Level == "Info");

            mongoDbHelper.ClearCollection<SysLogInfo>("SysLog1");

 

实体类

using System;
using System.Collections.Generic;
using System.Text;

namespace MongoDBDriverDemo
{
    public class SysLogInfo
    {
        public object _id { get; set; }

        public DateTime LogDT { get; set; }

        public string Level { get; set; }

        public string Msg { get; set; }       

        public SysLogInfo()
        {

        }

        public SysLogInfo(object id, DateTime logDT,string level,string msg)
        {
            this._id = id;
            this.Msg = msg;
            this.LogDT = logDT;
            this.Level = level;
        }
    }
}

 二.Java驱动连接MongoDB

1.新建项目

 

 

 2.导入pom坐标

<dependencies>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.12.1</version>
    </dependency>
</dependencies>

3.测试

package cn.lb.entity;


import com.mongodb.Block;
import com.mongodb.client.*;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import org.bson.Document;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Updates.inc;

public class mongodbTest {

    private  static  MongoCursor<Document> cursor=null;

    public static void main(String[] args) {
        try{
            MongoClient mongoClient= MongoClients.create("mongodb://47.100.46.200:27017");

            MongoDatabase database=mongoClient.getDatabase("mydb");

            MongoCollection<Document> collection=database.getCollection("test");

            Document doc = new Document("name", "MongoDB")
                    .append("type", "database")
                    .append("count", 1)
                    .append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
                    .append("info", new Document("x", 203).append("y", 102));

            System.out.println("--------------插入一条------------------");

            collection.insertOne(doc);

            System.out.println("--------------插入多条------------------");

            List<Document> documentList=new ArrayList<Document>();
            for (int i = 0; i < 100; i++) {
                documentList.add(new Document("i",i));
            }

            collection.insertMany(documentList);

            System.out.println("文档总数:"+collection.countDocuments());

            Document myDoc=collection.find().first();

            System.out.println("第一个文档:"+myDoc.toJson());

            cursor=collection.find().iterator();

            while(cursor.hasNext())
            {
                System.out.println(cursor.next().toJson());
            }

            for (Document cur:collection.find())
            {
                System.out.println(cur.toJson());
            }

            System.out.println("------------过滤---------------");

            myDoc=collection.find(eq("i",71)).first();

            System.out.println(myDoc);

            Block<Document> printBlock=new Block<Document>() {
                @Override
                public void apply(Document document) {
                    System.out.println(document.toString());
                }
            };

            collection.find(gt("i",50)).forEach(printBlock);


            collection.find(and(gt("i",50),lte("i",100))).forEach(printBlock);

            System.out.println("---------------更新文档-------------------");

            collection.updateOne(eq("i",10),new Document("$set",new Document("i",10)));

            UpdateResult updateResult=collection.updateMany(lt("i",100),inc("i",100));

            System.out.println(updateResult. getModifiedCount());

            System.out.println("-----------------删除文档-------------------");

            collection.deleteOne(eq("i",110));

            DeleteResult deleteResult=collection.deleteMany(gte("i",100));

            System.out.println(deleteResult.getDeletedCount());

            collection.createIndex(new Document("i",1));

        }catch (Exception ex)
        {
            ex.printStackTrace();
        }
        finally {
           if (cursor!=null)
           {
               cursor.close();
           }
        }
    }
}

4.结果

 

 

 

Java封装MongoDB

导入pom坐标

 

<dependencies>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.12.1</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
    </dependencies>

 

添加类

 

package cn.lb.util;

import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.result.UpdateResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.bson.Document;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;


public class MongoDBUtil {
    private static MongoDBUtil mongoDBUtil;

    private static final String PLEASE_SEND_IP = "没有传入ip或者端口号";
    private static final String PLEASE_INSTANCE_MONGOCLIENT = "请实例化MongoClient";
    private static final String PLEASE_SEND_MONGO_REPOSITORY = "请指定要删除的mongo库";
    private static final String DELETE_MONGO_REPOSITORY_EXCEPTION = "删除mongo库异常";
    private static final String DELETE_MONGO_REPOSITORY_SUCCESS = "批量删除mongo库成功";
    private static final String NOT_DELETE_MONGO_REPOSITORY = "未删除mongo库";
    private static final String DELETE_MONGO_REPOSITORY = "成功删除mongo库:";
    private static final String CREATE_MONGO_COLLECTION_NOTE = "请指定要创建的库";
    private static final String NO_THIS_MONGO_DATABASE = "未找到指定mongo库";
    private static final String CREATE_MONGO_COLLECTION_SUCCESS = "创建mongo库成功";
    private static final String CREATE_MONGO_COLLECTION_EXCEPTION = "创建mongo库错误";
    private static final String NOT_CREATE_MONGO_COLLECTION = "未创建mongo库collection";
    private static final String CREATE_MONGO_COLLECTION_SUCH = "创建mongo库collection:";
    private static final String NO_FOUND_MONGO_COLLECTION = "未找到mongo库collection";
    private static final String INSERT_DOCUMEN_EXCEPTION = "插入文档失败";
    private static final String INSERT_DOCUMEN_SUCCESSS = "插入文档成功";


    private static final Logger logger = Logger.getLogger(MongoDBUtil.class);

    private MongoDBUtil(){

    }

    private static class SingleHolder{
        private static MongoDBUtil mongoDBUtil = new MongoDBUtil();
    }

    public static MongoDBUtil instance(){

        return SingleHolder.mongoDBUtil;
    }

    public static MongoDBUtil getMongoDBUtilInstance(){
        if(mongoDBUtil == null){
            return new MongoDBUtil();
        }
        return mongoDBUtil;
    }

    /**
     * 获取mongoDB连接
     * @param host
     * @param port
     * @return
     */
    public MongoClient getMongoConnect(String host,Integer port){

        if(StringUtils.isBlank(host) || null == port){
            logger.error(PLEASE_SEND_IP);
            return null;
        }

        return new MongoClient(host, port);
    }


    /**
     * 批量删除mongo库
     * @param mongoClient
     * @param dbNames
     * @return
     */
    public String bulkDropDataBase(MongoClient mongoClient,String...dbNames){

        if(null == mongoClient) return PLEASE_INSTANCE_MONGOCLIENT;

        if(null==dbNames || dbNames.length==0){
            return PLEASE_SEND_MONGO_REPOSITORY;
        }
        try {
            Arrays.asList(dbNames).forEach(dbName -> mongoClient.dropDatabase(dbName));
            logger.info(DELETE_MONGO_REPOSITORY_SUCCESS);
        }catch (Exception e){
            e.printStackTrace();
            logger.error(DELETE_MONGO_REPOSITORY_EXCEPTION);
        }
        return dbNames == null ? NOT_DELETE_MONGO_REPOSITORY:DELETE_MONGO_REPOSITORY + String.join(",",dbNames);
    }


    /**
     * 创建指定database的collection
     * @param mongoClient
     * @param dbName
     * @param collections
     * @return
     */
    public String createCollections(MongoClient mongoClient,String dbName,String...collections){

        if(null == mongoClient) return PLEASE_INSTANCE_MONGOCLIENT;

        if(null==collections || collections.length==0){
            return CREATE_MONGO_COLLECTION_NOTE;
        }

        MongoDatabase mongoDatabase = mongoClient.getDatabase(dbName);
        if(null == mongoDatabase) return NO_THIS_MONGO_DATABASE;

        try {
            Arrays.asList(collections).forEach(collection ->  mongoDatabase.createCollection(collection));
            logger.info(CREATE_MONGO_COLLECTION_SUCCESS);
            return collections == null ? NOT_CREATE_MONGO_COLLECTION:CREATE_MONGO_COLLECTION_SUCH + String.join(",",collections);
        }catch (Exception e){
            e.printStackTrace();
            logger.error(CREATE_MONGO_COLLECTION_EXCEPTION);
        }

        return null;
    }

    /**
     * 获取MongoCollection
     * @param mongoClient
     * @param dbName
     * @param collection
     * @return
     */
    public MongoCollection<Document> getMongoCollection(MongoClient mongoClient,String dbName,String collection){

        if(null == mongoClient) return null;

        if(StringUtils.isBlank(dbName)) return null;

        if(StringUtils.isBlank(collection)) return null;

        MongoDatabase mongoDatabase = mongoClient.getDatabase(dbName);

        MongoCollection<Document> collectionDocuments = mongoDatabase.getCollection(collection);

        if(null == collectionDocuments) return null;

        return collectionDocuments;
    }

    /**
     * 获取到MongoClient
     * @param ip
     * @param port
     * @param userName
     * @param dbName
     * @param psw
     * @returnMongoClient
     */
    public static MongoClient getMongoClientByCredential(String ip,int port,String userName,String dbName,String psw){
        ServerAddress serverAddress = new ServerAddress(ip,port);
        List<ServerAddress> addrs = new ArrayList<ServerAddress>();
        addrs.add(serverAddress);

        //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码
        MongoCredential credential = MongoCredential.createScramSha1Credential(userName, dbName, psw.toCharArray());
        List<MongoCredential> credentials = new ArrayList<MongoCredential>();
        credentials.add(credential);

        //通过连接认证获取MongoDB连接
        MongoClient mongoClient = new MongoClient(addrs,credentials);
        return mongoClient;
    }


    /**
     * 插入文档数据
     * @param mongoCollection
     * @param params
     */
    public void insertDoucument(final MongoCollection<Document> mongoCollection, final Map<String,Object> params){
        if(null == mongoCollection) {
            logger.info(NO_FOUND_MONGO_COLLECTION);
            return;
        }

        try {
            Document document = new Document();
            params.keySet().stream().forEach(field -> document.append(field, params.get(field)));

            List<Document> documents = new ArrayList<>();
            documents.add(document);
            mongoCollection.insertMany(documents);
            logger.info(INSERT_DOCUMEN_SUCCESSS);
        }catch (Exception e){
            e.printStackTrace();
            logger.error(INSERT_DOCUMEN_EXCEPTION);
        }
    }

    /**
     * 更新文档
     * @param mongoCollection
     * @param conditionParams
     * @param updateParams
     */
    public  void updateDocument(final MongoCollection<Document> mongoCollection,final Map<String,Object> conditionParams,
                                final Map<String,Object> updateParams,final boolean MultiUpdate
    ){

        if(null == mongoCollection) return;

        if (null == conditionParams) return;

        if (null == updateParams) return;


        Document conditonDocument = new Document();
        conditionParams.keySet().stream().filter(p -> null != p).forEach(o -> {
            conditonDocument.append(o,conditionParams.get(o));
        });


        Document updateDocument = new Document();
        updateParams.keySet().stream().filter(p -> null != p).forEach(o -> {
            updateDocument.append(o,updateParams.get(o));
        });
        UpdateResult updateResult = null;
        if (MultiUpdate){//是否批量更新
            updateResult = mongoCollection.updateMany(conditonDocument,new Document("$set",updateDocument));
        }else {
            updateResult = mongoCollection.updateOne(conditonDocument,new Document("$set",updateDocument));
        }
        System.out.println("修改了:"+updateResult.getModifiedCount()+" 条数据 ");

    }

    /**
     *条件 删除文档 是否多条删除
     * @param mongoCollection
     * @param multiple
     * @param conditionParams
     * @return
     */
    public long deleteDocument(final MongoCollection<Document> mongoCollection,final boolean multiple,
                               final Map<String,Object> conditionParams){

        if(null == mongoCollection) return 0;

        if(null == conditionParams) return 0;

        Document document = new Document();

        conditionParams.keySet().stream().filter(p -> null != p).forEach(o -> {
            document.append(o,conditionParams.get(o));
        });

        if(multiple) {
            return mongoCollection.deleteMany(document).getDeletedCount();
        }

        //删除文档第一条
        return mongoCollection.deleteOne(document).getDeletedCount();
    }

    /**
     * 查询文档 带条件、范围查找、排序、分页
     * @param mongoCollection
     * @param conditionParams
     * @param limit
     * @param skip
     * @param sortParams
     */
    public FindIterable<Document> queryDocument(final MongoCollection<Document> mongoCollection, final Map<String,Object> conditionParams,
                                                final String op,final String compareField, final Map<String,Integer> gtLtOrOtherParams,
                                                final Map<String,Object> sortParams,final Integer skip,final Integer limit
    ){

        if(null == mongoCollection) return null;

        FindIterable<Document> findIterable = mongoCollection.find();
        Document conditionDocument = new Document();
        Document compareDocument = new Document();

        if(null != conditionParams && null != findIterable){

            conditionParams.forEach((k,v) ->{
                if (StringUtils.isNotBlank(k)) {
                    conditionDocument.append(k,v);
                }
            });

            findIterable = findIterable.filter(conditionDocument);

            MongoCursor<Document> mongoCursor = findIterable.iterator();
            while(mongoCursor.hasNext()){
                System.out.println("条件过滤  -->"+mongoCursor.next());
            }
        }

        if(null != findIterable && null != gtLtOrOtherParams){

            Document gtOrLtDoc = new Document();
            gtLtOrOtherParams.forEach((k,v) -> {
                if(StringUtils.isNotBlank(k)) gtOrLtDoc.append(k,v);
            });

            compareDocument = new Document(compareField,gtOrLtDoc);
            findIterable = findIterable.filter(new Document(compareField,compareDocument));
        }

        if (StringUtils.isNotBlank(op)){
            if ("and".equals(op)){
                findIterable = mongoCollection.find(Filters.and(conditionDocument,compareDocument));
            }else if("or".equals(op)){
                findIterable = mongoCollection.find(Filters.or(conditionDocument,compareDocument));
            }else if("not".equals(op)){//排除范围
                findIterable = mongoCollection.find(Filters.and(conditionDocument,Filters.not(compareDocument)));
            }
        }else{//默认是AND查询
            findIterable = mongoCollection.find(Filters.and(conditionDocument,compareDocument));
        }
        MongoCursor<Document> mongoCursor3 = findIterable.iterator();
        while(mongoCursor3.hasNext()){
            System.out.println(op+"过滤  -->"+mongoCursor3.next());
        }

        if(null != sortParams){
            Document sortDocument = new Document();
            sortParams.forEach((k,v) ->{
                if (StringUtils.isNotBlank(k)) {
                    sortDocument.append(k,v);
                }
            });

            findIterable = findIterable.sort(sortDocument);

            MongoCursor<Document> mongoCursor2 = findIterable.iterator();
            while(mongoCursor2.hasNext()){
                System.out.println("排序  -->"+mongoCursor2.next());
            }
        }



        if(null != findIterable && null != limit){
            findIterable = findIterable.limit(limit);
        }
        if(null != findIterable && null != skip){
            findIterable = findIterable.skip(skip);
        }

        return findIterable;
    }


    /**
     * in查询
     * @param mongoCollection
     * @return
     */
    public FindIterable<Document>  queryDocumentIn(final MongoCollection<Document> mongoCollection,String field, List<String> list
    ){

        if(null == mongoCollection) return null;
        FindIterable<Document> findIterable = mongoCollection.find(new Document(field,new Document("$in",list)));
        return findIterable;
    }


    /**
     * 全文查询
     * @param mongoCollection
     * @return
     */
    public FindIterable<Document>  queryDocument(final MongoCollection<Document> mongoCollection
    ){
        if(null == mongoCollection) return null;
        FindIterable<Document> findIterable = mongoCollection.find();
        return findIterable;
    }


    /**
     * 查询文档 简单条件查询
     * @param mongoCollection
     * @param conditionParams
     * @return
     */
    public FindIterable<Document> queryDocument(final MongoCollection<Document> mongoCollection, final Map<String,Object> conditionParams
    ){

        if(null == mongoCollection) return null;

        FindIterable<Document> findIterable = mongoCollection.find();

        if(null == conditionParams || null == findIterable) return findIterable;

        Document document = new Document();
        conditionParams.forEach((k,v)->{
            if (StringUtils.isNotBlank(k)) {
                document.append(k,v);
            }
        });
        findIterable = findIterable.filter(document);

        return findIterable;

    }


    /**
     * 用于输出部分的列信息
     * @param documents
     */
    public static void printDocuments(FindIterable<Document> documents, String[] fields) {
        if (fields != null && fields.length > 0) {
            int num = 0;
            for (Document d : documents) {
                StringBuilder stringBuilder = new StringBuilder();
                for (int i = 0; i < fields.length; i++) {
                    /*if(fields[i].equals("catm")){

                    }*/
                    stringBuilder.append(fields[i] + ": "+d.getString(fields[i])+" ");
                }
                System.out.println("第" + (++num) + "条数据: " + stringBuilder);

            }
        }else{
            for (Document d : documents) {
                System.out.println(d.toString());
            }
        }
    }

    /**
     * 用于输出所有的列信息
     * @param documents
     */
    public void printDocuments(FindIterable<Document> documents) {
        int num = 0;
        for (Document d : documents) {
            System.out.println("第" + (++num) + "条数据: " + d.toString());
        }
    }

}

 

 

C#API参考地址:https://api.mongodb.com/csharp/2.2/html/N_MongoDB_Driver.htm

C#驱动连接文档地址:http://mongodb.github.io/mongo-csharp-driver/2.10/getting_started/quick_tour/

Java驱动连接文档地址:http://mongodb.github.io/mongo-java-driver/3.12/driver/getting-started/installation/

Cassandra Java驱动程序的最佳设置只能写入本地数据中心

Cassandra Java驱动程序的最佳设置只能写入本地数据中心

我最近开始为我们的Cassandra用例使用Datastax Java驱动程序…我们将使用Datastax Java驱动程序读取/写入Cassandra …

我成功地可以使用Datastax Java驱动程序创建Cassandra连接…但是我想知道,在生产环境中是否还有其他设置可以使用Datastax Java驱动程序在连接Cassandra时获得更好的性能?

/**
 * Creating Cassandra connection using Datastax driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some-node");

        // Can anybody explain me what does below piece of code do?

        builder.poolingOptions().setCoreConnectionsPerHost(
                Hostdistance.LOCAL,builder.poolingOptions().getMaxConnectionsPerHost(Hostdistance.LOCAL));

        // And also what does below piece of code is doing?       
        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append("-");
            s.append(h.getRack());
            s.append("-");
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");
    } catch (NoHostAvailableException e) {

    } catch (Exception e) {

    }
}

我的首要任务是:

>根据本地数据中心过滤掉Cassandra节点.因此,在连接池中,只有本地数据中心Cassandra节点.
>在使用Datastax java驱动程序时,可以获得最佳的性能,并具有一定的设置.

我知道某些设置可能会在不同的环境中有所不同,但是在使用Datastax Java驱动程序进行Cassandra连接时,可能需要遵循一些设置来获得最佳性能.

就像在Astyanax早期使用的一个例子,那就是你需要使用TOKEN_AWARE …

所以应该有一些最好的设置,或者推荐使用Datastax java驱动程序?

解决方法

Filter out the Cassandra nodes basis on local datacenter.. So in the connection pool it will only have local datacenter Cassandra nodes

那么你需要使用DCAwareRoundRobinPolicy.

Like for an example in Astyanax when I was using earlier,it was that you need to use TOKEN_AWARE…

对于DataStax Java驱动程序也是如此,它被称为TokenAwarePolicy,可以在上面引用的DCAwareRoundRobinPolicy之上使用.

I kNow it might be possible that certain settings will differ in different environment but there might be some settings that everybody has to follow to get the optimal performance while making the Cassandra connections using Datastax Java driver..

我不能说“每个人”,但是除了上述负载平衡策略的适当选择之外,其余的将最有可能是环境依赖的.但是,当然如果你关心性能,那么从Configuration和一些现实的工作负载播放各种设置是一个好主意,看看有没有帮助.

Cassandra Java驱动程序:有多少接触点合理?

Cassandra Java驱动程序:有多少接触点合理?

在 Java中,我连接到Cussandra集群,如下所示:
Cluster cluster = Cluster.builder().addContactPoints("host-001","host-002").build();

我需要在那里指定集群的所有主机吗?如果我有一个1000个节点的集群?我随机选择几个?有多少,我真的做这个随机吗?

解决方法

我会说,配置您的客户端使用与您配置Cassandra使用的种子节点列表相同的节点列表将给您最好的结果.

如你所知,Cassandra节点使用种子节点找到对方并发现环的拓扑.驱动程序将仅使用列表中提供的一个节点来建立控制连接,用于发现集群拓扑的控制连接,但为客户端提供种子节点将增加客户端继续运行的机会,以便在节点故障.

今天关于与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差类似mongodb的数据库的介绍到此结束,谢谢您的阅读,有关Android Studio:使用Mongo Java驱动程序连接到MongoDB服务器、C#、Java驱动连接MongoDB以及封装(C#的MongoDBHelper,Java的MongoDBUtil)、Cassandra Java驱动程序的最佳设置只能写入本地数据中心、Cassandra Java驱动程序:有多少接触点合理?等更多相关知识的信息可以在本站进行查询。

本文标签: