对于想了解concurrencyinhibernate的读者,本文将是一篇不可错过的文章,并且为您提供关于ConcurrencyLearning1、Configuringjdbc-poolforhig
对于想了解concurrency in hibernate的读者,本文将是一篇不可错过的文章,并且为您提供关于Concurrency Learning 1、Configuring jdbc-pool for high-concurrency、Golang 并发模式:超时和继续 Go Concurrency Patterns: Timing out, moving on、golang语言中的context详解,Go Concurrency Patterns: Context的有价值信息。
本文目录一览:- concurrency in hibernate
- Concurrency Learning 1
- Configuring jdbc-pool for high-concurrency
- Golang 并发模式:超时和继续 Go Concurrency Patterns: Timing out, moving on
- golang语言中的context详解,Go Concurrency Patterns: Context
concurrency in hibernate
我有一个servlet,它为用户做一些工作,然后减少用户的信用。当我实时查看数据库中的用户信用时,如果有来自同一用户的许多并发请求,则由于并发控制,信用被错误地扣除。假设我有一台服务器,并且数据库管理处于hibernate状态。我正在使用事务控制来处理整个请求,请参见代码以获取详细信息。我有几个问题:
当面对来自同一用户的多个并发请求时,为什么db中的信用计数器到处跳?为什么我的交易控制不起作用?
如果在检索用户帐户然后尝试更新之后修改了基础数据,为什么我没有得到任何数据
HibernateException(eg.StaleObjectException)
?我的交易跨整个用户请求,是否有更好的方法?请批评。如果您觉得我做错了所有事情,请随时重写示例代码结构。
Main servlet class:protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try{ Manager.beginTransaction(); cmdDowork(request, response); Manager.commitTransaction(); }catch(Exception exp){ Manager.rollbackTransaction(); exp.printStackTrace(); } finally{ Manager.closeSession(); }}public void cmdDowork(){try{ UserAccount userAccount = lazyGetUserAccount(request.getParameter("userName")); doWorkForUser(userAccount);//time and resource consuming process if(userAccount!=null) { decUserAccountQuota(userAccount); }}catch (HibernateException e){ e.printStackTrace();}}public static UserAccount lazyGetUserAccount(String userName) { UserAccount userAccount = Manager.getUserAccount(userName); if(userAccount == null){ userAccount = new UserAccount(userName); userAccount.setReserve(DEFAULT_USER_QUOTA); userAccount.setBalance(DEFAULT_USER_QUOTA); Manager.saveUserAccount(userAccount); } return userAccount;} private boolean decUserAccountQuota(UserAccount userAccount) { if(userAccount.getBalance() Edit: code I used to test optimistic locking as suggested by the answer, I am not getting aany StaleObjectException, the update were committed successfully..Session em1=Manager.sessionFactory.openSession(); Session em2=Manager.sessionFactory.openSession(); em1.getTransaction().begin(); em2.getTransaction().begin(); UserAccount c1 = (UserAccount)em1.get( UserAccount.class, "jonathan" ); UserAccount c2 = (UserAccount)em2.get( UserAccount.class, "jonathan" ); c1.setBalance( c1.getBalance() -1 ); em1.flush(); em1.getTransaction().commit(); System.out.println("balance1 is "+c2.getBalance()); c2.setBalance( c2.getBalance() -1 ); em2.flush(); // fail em2.getTransaction().commit(); System.out.println("balance2 is "+c2.getBalance());
答案1
小编典典您有两种方法来处理这种情况: 悲观主义者 锁定或 乐观主义者 锁定。但是您似乎都不使用两者,这可能解释了错误的行为。
使用乐观锁定,Hibernate将检查在读取和保存用户帐户之间是否没有更改用户帐户。然后,并发事务可能会失败并回滚。
使用悲观锁定,您可以在读取行时锁定该行,并且仅在事务完成时才将其解锁。这样可以防止并发事务读取会过时的数据。
刷新实体可能会读取新数据,也可能不会读取新数据,这取决于当前事务是否已经提交,但这也不是解决方案。由于似乎还创建了用户帐户(如果该帐户不存在),因此您不能如此轻松地应用悲观主义者锁定。我建议您然后使用乐观锁定(例如,使用时间戳来检测并发修改)。
阅读关于SO的另一个有关悲观主义者和乐观主义者锁定的问题。还可以查看hibernate章节“
事务和并发
”和“
hibernate注释 ”。
它应该是一样简单加入@Version
上的相应字段中,optimisticLockStrategy
默认值是VERSION
(一个单独的列被使用)。
-更新-
您可以测试它是否可以在测试用例中使用。我创建了一个简单的实体Counter
带ID
,value
以及version
多个领域。
public class Counter implements Serializable { @Id @GeneratedValue(strategy=GenerationType.AUTO) @Basic(optional = false) @Column(name = "ID") private Integer id; @Column(name = "VALUE") private Integer value; @Column(name = "VERSION") @Version private Integer version; ...}
如果您 顺序 更新一个实体,它将起作用:
id = insertEntity( ... ); em1.getTransaction().begin(); Counter c1 = em1.find( Counter.class, id ); c1.setValue( c1.getValue() + 1 ); em1.flush(); em1.getTransaction().commit(); em2.getTransaction().begin(); Counter c2 = em2.find( Counter.class, id ); c2.setValue( c2.getValue() + 1 ); em2.flush(); // OK em2.getTransaction().commit();
我通过value=2
和获得了一个实体version=2
。
如果我模拟两个 并发 更新:
id = insertEntity( ... );em1.getTransaction().begin();em2.getTransaction().begin();Counter c1 = em1.find( Counter.class, id );Counter c2 = em2.find( Counter.class, id );c1.setValue( c1.getValue() + 1 );em1.flush(); em1.getTransaction().commit();c2.setValue( c2.getValue() + 1 );em2.flush(); // fail em2.getTransaction().commit();
然后第二次冲洗失败:
Hibernate: update COUNTER set VALUE=?, VERSION=? where ID=? and VERSION=?Hibernate: update COUNTER set VALUE=?, VERSION=? where ID=? and VERSION=?Dec 23, 2009 11:08:46 AM org.hibernate.event.def.AbstractFlushingEventListener performExecutionsSEVERE: Could not synchronize database state with sessionorg.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect): [org.ewe.Counter#15] at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:1765)
之所以如此,是因为SQL语句中的实际参数是:
update COUNTER set VALUE=1, VERSION=1 where ID=xxx and VERSION=0 --> 1 row updated update COUNTER set VALUE=1, VERSION=1 where ID=xxx and VERSION=0 --> 0 row updated, because version has been changed in between
Concurrency Learning 1
指令重排序存在的意义在于:JVM能够根据处理器的特性(CPU的多级缓存系统、多核处理器等)适当的重新排序机器指令,使机器指令更符合CPU的执行特点,最大限度的发挥机器的性能。 #无状态对象与安全:
- 线程安全:如果你的代码所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。如果每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。
- 有状态就是有数据存储功能。有状态对象(Stateful Bean),就是有实例变量的对象 ,可以保存数据,是非线程安全的。在不同方法调用间不保留任何状态。
- 无状态就是一次操作,不能保存数据。无状态对象(Stateless Bean),就是没有实例变量的对象 .不能保存数据,是不变类,是线程安全的。 !http://www.cnblogs.com/MRRAOBX/articles/4118573.html #happens-before完整规则:
- 同一个线程中的每个Action都happens-before于出现在其后的任何一个Action。
- 对一个监视器的解锁happens-before于每一个后续对同一个监视器的加锁。(保证了同一时间只有一个被锁的对象拿到该锁)
- 对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。(volatile的可见性)
- Thread.start()的调用会happens-before于启动线程里面的动作。
- Thread中的所有动作都happens-before于其他线程检查到此线程结束或者Thread.join()中返回或者Thread.isAlive()==false。
- 一个线程A调用另一个另一个线程B的interrupt()都happens-before于线程A发现B被A中断(B抛出异常或者A检测到B的isInterrupted()或者interrupted())。
- 一个对象构造函数的结束happens-before与该对象的finalizer的开始
- 如果A动作happens-before于B动作,而B动作happens-before与C动作,那么A动作happens-before于C动作(传递性)
#非阻塞算法: ##CAS 独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。 而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止.(CAS就是一种乐观锁的实现) CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。 ##原子操作:流行的原子
Configuring jdbc-pool for high-concurrency
In this article we will focus on configuration of the high-concurrency connection pool. For ease of migration for Tomcat users, the configuration has been written to mimic that of Commons DBCP.
The documentation of jdbc-pool covers all the attributes. Please note that these attributes are also available as direct setters on theorg.apache.tomcat.jdbc.pool.DataSource bean if you''re using a dependency injection framework. So in this article we will focus on use cases, and different configurations for Tomcat.
Simple Connection Pool for MySQL
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
/>
The first thing we notice is the factory="org.apache.tomcat.jdbc.pool.DataSourceFactory" attribute.
When Tomcat reads the type="javax.sql.DataSource" it will automatically configure its repackaged DBCP, unless you specify a different factory. The factory object is what creates and configures the connection pool itself.
There are two ways to configure Resource elements in Apache Tomcat.
Configure a global connection pool
File: conf/server.xml
<GlobalNamingResources>
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
/>
</GlobalNamingResources>
You then create a ResourceLink element to make the pool available to the web applications. If you want the pool available to all applications under the same name, the easiest way is to edit the File: conf/context.xml
<Context>
<ResourceLink type="javax.sql.DataSource"
name="jdbc/LocalTestDB"
global="jdbc/TestDB"
/>
<Context>
Note, that if you don''t want a global pool, move the Resource element from server.xml into your context.xml file for the web application.
And to retrieve a connection from this configuration, the simple Java code looks like
Context initContext = new
InitialContext();
Context envContext = (Context)initContext.lookup("java:/comp/env");
DataSource datasource = (DataSource)envContext.lookup("jdbc/LocalTestDB");
Connection con = datasource.getConnection();
Simple in Java
We can achieve the same configuration using just Java syntax.
DataSource ds = new DataSource();
ds.setDriverClassName("com.mysql.jdbc.Driver");
ds.setUrl("jdbc:mysql://localhost:3306/mysql");
ds.setUsername("root");
ds.setPassword("password");
Or to separate out the pool properties
PoolProperties pp = new PoolProperties();
pp.setDriverClassName("com.mysql.jdbc.Driver");
pp.setUrl("jdbc:mysql://localhost:3306/mysql");
pp.setUsername("root");
pp.setPassword("password");
DataSource ds = new DataSource(pp);
All properties that we make available in XML through the object factory are also available directly on the PoolProperties or the DataSource objects.
Sizing the connection pool
We will work with the following attributes to size the connection pool
initialSize
maxActive
maxIdle
minIdle
It''s important to understand these attributes, as they do seem quite obvious but there are some secrets. Let''s nail it down.
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
initialSize="10"
maxActive="100"
maxIdle="50"
minIdle="10"
/>
The initialSize=10 is the number of connections that will be established when the connection pool is created
When defined in GlobalNamingResources the pool is created upon Tomcat startup
When defined in Context the pool is created when it''s first looked up in JNDI
The maxActive=100 is the maximum number of established connections to the database. This attribute is used to limit the number of connections a pool can have open so that capacity planning can be done on the database side.
The minIdle=10 is the minimum number of connections always established after the connection pool has reached this size. The pool can shrink to a smaller number of connections if the maxAge attribute is used and the connection that should have gone to the idle pool ends up being closed since it has been connected too long. However, typically we see that the number of open connections does not go below this value.
The maxIdle attribute is a little bit trickier. It behaves differently depending on if the pool sweeper is enabled. The pool sweeper is a background thread that can test idle connections and resize the pool while the pool is active. The sweeper is also responsible for connection leak detection. The pool sweeper is defined by
public boolean isPoolSweeperEnabled() {
boolean timer = getTimeBetweenEvictionRunsMillis()>0;
boolean result = timer && (isRemoveAbandoned() && getRemoveAbandonedTimeout()>0);
result = result || (timer && getSuspectTimeout()>0);
result = result || (timer && isTestWhileIdle() && getValidationQuery()!=null);
return result;
}
The sweeper runs every timeBetweenEvictionRunsMillis milliseconds.
The maxIdle attribute is defined as follows:
Pool sweeper disabled - If the idle pool is larger than maxIdle, the connection will be closed when returned to the pool
Pool sweeper enabled - Number of idle connections can grow beyond maxIdle but can shrink down to minIdle if the connection has been idle for longer than minEvictableIdleTimeMillis. This may sounds strange that the pool can will not close connections even if the idle pool is larger thanmaxIdle. It is actually optimal behavior. Imagine the following scenario:
100 parallel requests served by 100 threads
Each thread borrows a connection 3 times during a request
In this scenario, if we had maxIdle="50" then we could end up closing and opening 50x3 connections. This taxes the database and slows down the application. During peak traffic spikes like this, we want to be able to utilize all the pooled connections. So we definitely want to have the pool sweeper enabled. We will get to that in the next section. There is an additional attribute we mentioned here, maxAge. maxAge defines the time in milliseconds that a connection can be open/established. When a connection is returned to the pool, if the connection has been connected and the time it was first connected is longer than the maxAge value, it will be closed.
As we saw by the isPoolSweeper enabled algorithm, the sweeper is enabled when one of the following conditions is met
timeBetweenEvictionRunsMillis>0 AND removeAbandoned=true AND removeAbandonedTimeout>0
timeBetweenEvictionRunsMillis>0 AND suspectTimeout>0
timeBetweenEvictionRunsMillis>0 AND testWhileIdle=true AND validationQuery!=null
As of version 1.0.9 the following condition has been addedtimeBetweenEvictionRunsMillis>0 AND minEvictableIdleTimeMillis>0
(timer && getMinEvictableIdleTimeMillis()>0);
So in order to get optimal pool sizing, we''d like to modify our configuration to meet one of these conditions
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
initialSize="10"
maxActive="100"
maxIdle="50"
minIdle="10"
suspectTimeout="60"
timeBetweenEvictionRunsMillis="30000"
minEvictableIdleTimeMillis="60000"
/>
Validating Connections
Pooling database connections presents a challenge, since pooled connections can become stale. It''s often the case that either the database, or perhaps a device in between the pool and the database, timeout the connection. The only way to truly validate a connection is to make a round trip to the database, to ensure the session is still active. In Java 6, the JDBC API addressed this by supplying a isValid call on the java.sql.Connection interface. Prior to that, pools had to resort to executing a query, such as SELECT 1 on MySQL. This query is easy for the database to parse, doesn''t require any disk access. TheisValid call is scheduled to be implemented but the pool, intended to be used with Apache Tomcat 6, must also preserve Java 5 compatibility.
Validation Queries
Validation queries present a few challenges
If called too often, they can degrade the performance of the system
If called too far apart, they can result in stale connections
If the application calls setTransactionIsolation with autoCommit=false, it can yield a SQLException if the application tries to callsetTransactionIsolation again, since the validation query might have already initiated a new transaction in the DB.
Let''s look at the most typical configuration:
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
testOnBorrow="true"
validationQuery="SELECT 1"
/>
With this configuration, the query SELECT 1 is executed each time the Java code calls Connection con = dataSource.getConnection();.
This guarantees that the connection has been tested before it''s handed to the application. However, for applications using connections very frequently for short periods of time, this has a severe impact on performance. The two other configuration options:
testWhileIdle
testOnReturn
are not really that helpful, as they do test the connection, but at the wrong time.
Not having validation is not really a choice for a lot of applications. Some applications get around it by setting minIdle=0 and and a lowminEvictableIdleTimeMillis value so that if connections sit idle long enough to where the database session would time out, the pool will time them out as idle before that happens.
The better solution is to test connections that have not been tested for a while.
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
testOnBorrow="true"
validationQuery="SELECT 1"
validationInterval="30000"
/>
In this configuration, connections would be validated, but no more than every 30 seconds. It''s a compromise between performance and connection validation. And as mentioned, if we want to get away with validation all together we could configure the pool to timeout idle connections
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
timeBetweenEvictionRunsMillis="5000"
minEvictableIdleTimeMillis="5000"
minIdle="0"
/>
Setting up a custom database session
In some use cases it is required to perform some tasks when a new database session is initialized. This could involve executing a simple SQL statement or calling a stored procedure.
This is typically done at the database level, where you can create triggers.
create or replace trigger logon_alter_session after logon on database
begin
if sys_context(''USERENV'', ''SESSION_USER'') = ''TEMP'' then
EXECUTE IMMEDIATE ''alter session ....'';
end if;
end;
/
This would however affect all users, and in the situations where this is not sufficient and we want a custom query to be executed when a new session is created.
<Resource name="jdbc/TestDB" auth="Container"
type="javax.sql.DataSource"
description="Oracle Datasource"
url="jdbc:oracle:thin:@//localhost:1521/orcl"
driverClassName="oracle.jdbc.driver.OracleDriver"
username="default_user"
password="password"
maxActive="100"
validationQuery="select 1 from dual"
validationInterval="30000"
testOnBorrow="true"
initSQL="ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY MM DD HH24:MI:SS'"/>
The initSQL is executed exactly once per connection, and that is when the connection is established.
Connection pool leaks and long running queries
Connection pool also contain some diagnostics. Both jdbc-pool and Commons DBCP are able to detect and mitigate connections that are not being returned to the pool. These are referred to as abandoned to leaked connections as demonstrated here.
Connection con = dataSource.getConnection();
Statement st = con.createStatement();
st.executeUpdate("insert into id(value) values (1''); //SQLException here
con.close();
There are five configuration settings that are used to detect these type of error conditions, the first three shared with Common DBCP
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
maxActive="100"
timeBetweenEvictionRunsMillis="30000"
removeAbandoned="true"
removeAbandonedTimeout="60"
logAbandoned="true"
/>
removeAbandoned - set to true if we want to detect leaked connections
removeAbandonedTimeout - the number of seconds from when dataSource.getConnection was called to when we consider it abandoned
logAbandoned - set to true if we should log that a connection was abandoned. If this option is set to true, a stack trace is recorded during thedataSource.getConnection call and is printed when a connection is not returned.
There are of course use cases when we want this type of diagnostics, but we are also running batch jobs that hold a connection for minutes at a time. How do we handle that?
Two additional options/features have been added to support these
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
maxActive="100"
timeBetweenEvictionRunsMillis="30000"
removeAbandoned="true"
removeAbandonedTimeout="60"
logAbandoned="true"
abandonWhenPercentageFull="50"
/>
abandonWhenPercentageFull - A connection must meet the threshold removeAbandonedTimeout AND the number of open connections must exceed the percentage of this value.
Using this property will give connections that would have otherwised been considered abandoned, possibly during a false positive. Setting the value to 100would mean that connections are not considered abandoned unless we''ve reached our maxActive limit. This gives the pool a bit more flexibility, but it doesn''t address our 5 minute batch job using a single connection. In that case, we want to make sure that when we detect that the connection is still being used, we reset the timeout timer, so that the connection wont be considered abandoned. We do this by inserting an interceptor.
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
maxActive="100"
timeBetweenEvictionRunsMillis="30000"
removeAbandoned="true"
removeAbandonedTimeout="60"
logAbandoned="true"
abandonWhenPercentageFull="50"
jdbcInterceptors="ResetAbandonedTimer"
/>
Interceptor—org.apache.tomcat.jdbc.pool.interceptor.ResetAbandonedTimer—can be specified by its fully qualified name or if it lives in theorg.apache.tomcat.jdbc.pool.interceptor package, by its short class name.
Each time a statement is prepared or a query is executed, the timer will reset the abandon timer on the connection pool. This way, the 5 minute batch job, doing lots of queries and updates, will not timeout.
There are of course situations where you want to know about these scenarios, but you don''t want to kill or reclaim the connection, since you are not aware of what impact that will have on your system.
<Resource type="javax.sql.DataSource"
name="jdbc/TestDB"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
driverClassName="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/mysql"
username="mysql_user"
password="mypassword123"
maxActive="100"
timeBetweenEvictionRunsMillis="30000"
logAbandoned="true"
suspectTimeout="60"
jdbcInterceptors="ResetAbandonedTimer"
/>
The suspectTimeout attribute works in the exact same way as the removeAbandonedTimeout except that instead of closing the connection, it simply logs a warning and issues a JMX notification with the information. This way, you can find out about these leaks or long running queries without changing the behavior of your system.
Pooling connections from other data sources
So far we have been dealing with connection pooling around connections acquired using the java.sql.Driver interface. Hence we used the attributes
driverClassName
url
However, some connection configurations are done using the javax.sql.DataSource or even the javax.sql.XADataSource interfaces, and we need to be able to support those configurations.
In plain Java this is relatively easy.
PoolProperties pp = new PoolProperties();
pp.setDataSource(myOtherDataSource);
DataSource ds = new DataSource(pp);
Connection con = ds.getConnection();
Or
DataSource ds = new DataSource();
ds.setDataSource(myOtherDataSource);
Connection con = ds.getConnection();
We are able to inject another javax.sql.DataSource or javax.sql.XADataSource object and use that for connection retrieval.
This comes in handy when we deal with XA connections.
For XML configuration, the jdbc-pool comes with a org.apache.tomcat.jdbc.naming.GenericNamingResourcesFactory class, a simple class to allow configuration of any type of named resource. To setup a Apache Derby XADataSource we can create this snippet:
<Resource factory="org.apache.tomcat.jdbc.naming.GenericNamingResourcesFactory"
name="jdbc/DerbyXA1"
type="org.apache.derby.jdbc.ClientXADataSource"
databaseName="sample1"
createDatabase="create"
serverName="localhost"
portNumber="1527"
user="sample1"
password="password"/>
This is a simple XADataSource that connects to a networked Derby instance on port 1527.
And if you would want to pool the XA connections from this data source we can create the connection pool element next to it.
<Resource factory="org.apache.tomcat.jdbc.naming.GenericNamingResourcesFactory"
name="jdbc/DerbyXA1"
type="org.apache.derby.jdbc.ClientXADataSource"
databaseName="sample1"
createDatabase="create"
serverName="localhost"
portNumber="1527"
user="sample1"
password="password"/>
<Resource factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
dataSourceJNDI="DerbyXA1"<!--Links to the Derby XADataSource-->
name="jdbc/TestDB1"
auth="Container"
type="javax.sql.XADataSource"
testWhileIdle="true"
testOnBorrow="true"
testOnReturn="false"
validationQuery="SELECT 1"
validationInterval="30000"
timeBetweenEvictionRunsMillis="5000"
maxActive="100"
minIdle="10"
maxIdle="20"
maxWait="10000"
initialSize="10"
removeAbandonedTimeout="60"
removeAbandoned="true"
logAbandoned="true"
minEvictableIdleTimeMillis="30000"
jmxEnabled="true"
jdbcInterceptors="ConnectionState;StatementFinalizer;SlowQueryReportJmx(threshold=10000)"
abandonWhenPercentageFull="75"/>
Note how the type=javax.sql.XADataSource is set, this will create a org.apache.tomcat.jdbc.pool.XADataSource instead oforg.apache.tomcat.jdbc.pool.DataSource
Here we are linking the two data sources using the dataSourceJNDI=DerbyXA1 attribute. The two data sources both have to exist in the same namespace, in our example, the jdbc namespace.
Currently JNDI lookup through DataSource.setDataSourceJNDI(...) is not supported, only through the factory object.
If you inject a
javax.sql.DataSource object - the pool will invoke javax.sql.DataSource.getConnection) method
javax.sql.DataSource object but specify username/password in the pool- the pool will invoke javax.sql.DataSource.getConnection(String username, String password) method
javax.sql.XADataSource object - the pool will invoke javax.sql.XADataSource.getXAConnection() method
javax.sql.XADataSource object but specify username/password in the pool- the pool will invoke javax.sql.DataSource.getXAConnection(String username, String password) method
Here is an interesting phenomenon that comes up when you deal with XADataSources. You can cast the returning object as either a java.sql.Connectionor a javax.sql.XAConnection and invoke methods for both interfaces on the same object.
DataSource ds = new DataSource();
ds.setDataSource(myOtherDataSource);
Connection con = ds.getConnection();
if (con instanceof XAConnection) {
XAConnection xacon = (XAConnection)con;
transactionManager.enlistResource(xacon.getXAResource());
}
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(SELECT 1);
JDBC Interceptors
To make the implementation flexible the concept of JDBC interceptors was created. The javax.sql.PooledConnection that wraps thejava.sql.Connection/javax.sql.XAConnection from the underlying driver or data source is itself an interceptor. The interceptors are based on thejava.lang.reflect.InvocationHandler interface. An interceptor is a class that extends the org.apache.tomcat.pool.jdbc.JdbcInterceptor class.
In this article, we''ll cover how interceptors are configured. In our next article, we will go over how to implement custom interceptors and their life cycle.
<Resource factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
...
jdbcInterceptors="ConnectionState;StatementFinalizer;SlowQueryReportJmx(threshold=10000)"
/>
Is the same as
<Resource factory="org.apache.tomcat.jdbc.pool.DataSourceFactory"
...
jdbcInterceptors="org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;
org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;
org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReportJmx(threshold=10000)"
/>
A short class name, such as ConnectionState, can be used if the interceptor is defined in the org.apache.tomcat.jdbc.pool.interceptor package. Otherwise, a fully qualified name is required.
Interceptors are defined in semi colon ; separated string. Interceptors can have zero or more parameters that are defined within parenthesis. Parameters are comma separated simple key-value pairs.
Connection State
The java.sql.Connection interface exposes a few attributes
autoCommit
readOnly
transactionIsolation
catalog
The default value of these attributes can be configured using the following properties for the connection pool.
defaultAutoCommit
defaultReadOnly
defaultTransactionIsolation
defaultCatalog
If set, the connection is configured for this when the connection is established to the database. If the ConnectionState interceptor is not configured, setting these properties is a one time operation only taking place during connection establishment. If the ConnectionState interceptor is configured, the connection is reset to the desired state each time its borrowed from the pool.
Some of these methods result in round trips to the database when queries. For example, a call to Connection.getTransactionIsolation() will result in the driver querying the transaction isolation level of the current session. Such round trips can have severe performance impacts for applications that use connections very frequently for very short/fast operations. The ConnectionState interceptor caches these values and intercepts calls to the methods that query them to avoid these round trips.
Statement Finalizer
Java code using the java.sql objects should do proper cleanup and closure of resources after they have been used.
An example of code cleanup
Connection con = null;
Statement st = null;
ResultSet rs = null;
try {
con = ds.getConnection();
...
} finally {
if (rs!=null) try { rs.close(); } catch (Exception ignore){}
if (st!=null) try { st.close(); } catch (Exception ignore){}
if (con!=null) try { con.close();} catch (Exception ignore){}
}
Some applications are not always written in this way. We previously showed how to configure the pool to diagnose and warn when connections were not closed properly.
The StatementFinalizer interceptor makes sure that the java.sql.Statement and its subclasses are properly closed when a connection is returned to the pool.
Getting hold of the actual JDBC connection
The connection proxy that is returned implements the javax.sql.PooledConnection so retrieving the connection is pretty straight forward, and no need to cast to a specific class in order to do so.
The same applies if you''ve configured the pool to handle javax.sql.XAConnection.
Another interesting way of retrieving the underlying connection is
Connection con = ds.getConnection();
Connection underlyingconnection = con.createStatement().getConnection();
This is because jdbc-pool does not proxy statements by default. There is of course an interceptor that protects against this use case.
And that''s it for now folks. Stay tuned for more in depth articles that will take us under the hood of some neat concurrency traps and tricks.
Golang 并发模式:超时和继续 Go Concurrency Patterns: Timing out, moving on
翻译自 Go Blog。
原文地址:https://blog.golang.org/go-co...
并发编程有自己的一些习惯用语,超时就是其中之一。虽然 Golang 的管道并没有直接支持超时,但是实现起来并不难。假设遇到了这样一种场景:在从 管道 ch 中取值之前至少等待 1 秒钟。我们可以创建一个管道用来传递信号,开启一个协程休眠一秒钟,然后给管道传递一个值。
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
然后就可以使用一个 select 语句来从 timeout 或者 ch 管道中获取数据。如果 ch 管道在 1 秒钟之后还没有返回数据,超时的判断条件就会触发 ch 的读操作将会被抛弃掉。
select {
case <-ch:
// a read from ch has occurred
case <-timeout:
// the read from ch has timed out
}
timeout 管道的缓冲区空间为 1,因此 timeout 协程将会在发送消息到管道之后退出执行。协程并不知道(也不关心)管道中的值是否被接受。因此,即使 ch 管道先于 timeout 管道返回了,timeout 协程也不会永久等待。timeout 管道最终会被垃圾回收机制回收掉。
(在上面的示例中我们使用了 time.Sleep 方法来演示协程和管道的机制,但是在真实的代码中应该用 time.After 方法,该方法返回了一个管道,并且会在参数指定的时间之后向管道中写入一个消息)
下面我们来看这种模式的另外一个变种。我们需要从多个分片数据库中同时取数据,程序只需要其中最先返回的那个数据。
下面的 Query 方法接受两个参数:一个数据库链接的切片和一个数据库查询语句。该方法将平行查询所有数据库并返回第一个接受到的响应结果。
func Query(conns []Conn, query string) Result {
ch := make(chan Result, 1)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <-ch
}
在上面这个例子中,go 关键字后的闭包实现了一个非阻塞式的查询请求,因为 DoQuery 方法被放到了带 default 分支的 select 语句中。假如 DoQuery 方法没有立即返回,default 分支将会被选中执行。让查询请求非阻塞保证了 for 循环中创建的协程不会一直阻塞。另外,假如在主方法从 ch 管道中取出值并返回结果之前有第二个查询结果返回了,管道 ch 的写操作将会失败,因为管道并未就绪。
上面描述的问题其实是“竞争关系”的一种典型例子,示例代码只是一种很通俗的解决方案。我们只是中管道上设定了缓冲区(通过在管道的 make 方法中传入第二个参数),保证了第一个写入者能够有空间来写入值。这种策略保证了管道的第一次写入一定会成功,无论代码以何种顺序执行,第一个写入的值将会被当作最终的返回值。
Go 的协程之间可以进行复杂的协作,以上两个例子就是最简单的证明。
golang语言中的context详解,Go Concurrency Patterns: Context
https://blog.golang.org/context
Introduction
In Go servers, each incoming request is handled in its own goroutine. Request handlers often start additional goroutines to access backends such as databases and RPC services. The set of goroutines working on a request typically needs access to request-specific values such as the identity of the end user, authorization tokens, and the request''s deadline. When a request is canceled or times out, all the goroutines working on that request should exit quickly so the system can reclaim any resources they are using.
At Google, we developed a context
package that makes it easy to pass request-scoped values, cancelation signals, and deadlines across API boundaries to all the goroutines involved in handling a request. The package is publicly available as context. This article describes how to use the package and provides a complete working example.
Context
The core of the context
package is the Context
type:
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this Context is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this context was canceled, after the Done channel
// is closed.
Err() error
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{}
}
(This description is condensed; the godoc is authoritative.)
The Done
method returns a channel that acts as a cancelation signal to functions running on behalf of the Context
: when the channel is closed, the functions should abandon their work and return. The Err
method returns an error indicating why the Context
was canceled. The Pipelines and Cancelation article discusses the Done
channel idiom in more detail.
A Context
does not have a Cancel
method for the same reason the Done
channel is receive-only: the function receiving a cancelation signal is usually not the one that sends the signal. In particular, when a parent operation starts goroutines for sub-operations, those sub-operations should not be able to cancel the parent. Instead, the WithCancel
function (described below) provides a way to cancel a new Context
value.
A Context
is safe for simultaneous use by multiple goroutines. Code can pass a single Context
to any number of goroutines and cancel that Context
to signal all of them.
The Deadline
method allows functions to determine whether they should start work at all; if too little time is left, it may not be worthwhile. Code may also use a deadline to set timeouts for I/O operations.
Value
allows a Context
to carry request-scoped data. That data must be safe for simultaneous use by multiple goroutines.
Derived contexts
The context
package provides functions to derive new Context
values from existing ones. These values form a tree: when a Context
is canceled, all Contexts
derived from it are also canceled.
Background
is the root of any Context
tree; it is never canceled:
// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context
WithCancel
and WithTimeout
return derived Context
values that can be canceled sooner than the parent Context
. The Context
associated with an incoming request is typically canceled when the request handler returns. WithCancel
is also useful for canceling redundant requests when using multiple replicas. WithTimeout
is useful for setting a deadline on requests to backend servers:
// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// A CancelFunc cancels a Context.
type CancelFunc func()
// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context''s Deadline is the sooner of now+timeout and the parent''s deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue
provides a way to associate request-scoped values with a Context
:
// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context
The best way to see how to use the context
package is through a worked example.
Example: Google Web Search
Our example is an HTTP server that handles URLs like /search?q=golang&timeout=1s
by forwarding the query "golang" to the Google Web Search API and rendering the results. The timeout
parameter tells the server to cancel the request after that duration elapses.
The code is split across three packages:
- server provides the
main
function and the handler for/search
. - userip provides functions for extracting a user IP address from a request and associating it with a
Context
. - google provides the
Search
function for sending a query to Google.
The server program
The server program handles requests like /search?q=golang
by serving the first few Google search results for golang
. It registers handleSearch
to handle the /search
endpoint. The handler creates an initial Context
called ctx
and arranges for it to be canceled when the handler returns. If the request includes the timeout
URL parameter, the Context
is canceled automatically when the timeout elapses:
func handleSearch(w http.ResponseWriter, req *http.Request) {
// ctx is the Context for this handler. Calling cancel closes the
// ctx.Done channel, which is the cancellation signal for requests
// started by this handler.
var (
ctx context.Context
cancel context.CancelFunc
)
timeout, err := time.ParseDuration(req.FormValue("timeout"))
if err == nil {
// The request has a timeout, so create a context that is
// canceled automatically when the timeout expires.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel() // Cancel ctx as soon as handleSearch returns.
}
The handler extracts the query from the request and extracts the client''s IP address by calling on the
userip
package. The client''s IP address is needed for backend requests, so
handleSearch
attaches it to
ctx
:
// Check the search query.
query := req.FormValue("q")
if query == "" {
http.Error(w, "no query", http.StatusBadRequest)
return
}
// Store the user IP in ctx for use by code in other packages.
userIP, err := userip.FromRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx = userip.NewContext(ctx, userIP)
The handler calls google.Search
with ctx
and the query
:
// Run the Google search and print the results.
start := time.Now()
results, err := google.Search(ctx, query)
elapsed := time.Since(start)
If the search succeeds, the handler renders the results:
if err := resultsTemplate.Execute(w, struct {
Results google.Results
Timeout, Elapsed time.Duration
}{
Results: results,
Timeout: timeout,
Elapsed: elapsed,
}); err != nil {
log.Print(err)
return
}
Package userip
The userip package provides functions for extracting a user IP address from a request and associating it with a Context
. A Context
provides a key-value mapping, where the keys and values are both of type interface{}
. Key types must support equality, and values must be safe for simultaneous use by multiple goroutines. Packages like userip
hide the details of this mapping and provide strongly-typed access to a specific Context
value.
To avoid key collisions, userip
defines an unexported type key
and uses a value of this type as the context key:
// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int
// userIPkey is the context key for the user IP address. Its value of zero is
// arbitrary. If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0
FromRequest
extracts a userIP
value from an http.Request
:
func FromRequest(req *http.Request) (net.IP, error) {
ip, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}
}
NewContext
returns a new Context
that carries a provided userIP
value:
func NewContext(ctx context.Context, userIP net.IP) context.Context {
return context.WithValue(ctx, userIPKey, userIP)
}
FromContext
extracts a userIP
from a Context
:
func FromContext(ctx context.Context) (net.IP, bool) {
// ctx.Value returns nil if ctx has no value for the key;
// the net.IP type assertion returns ok=false for nil.
userIP, ok := ctx.Value(userIPKey).(net.IP)
return userIP, ok
}
Package google
The google.Search function makes an HTTP request to the Google Web Search API and parses the JSON-encoded result. It accepts a Context
parameter ctx
and returns immediately if ctx.Done
is closed while the request is in flight.
The Google Web Search API request includes the search query and the user IP as query parameters:
func Search(ctx context.Context, query string) (Results, error) {
// Prepare the Google Search API request.
req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Set("q", query)
// If ctx is carrying the user IP address, forward it to the server.
// Google APIs use the user IP to distinguish server-initiated requests
// from end-user requests.
if userIP, ok := userip.FromContext(ctx); ok {
q.Set("userip", userIP.String())
}
req.URL.RawQuery = q.Encode()
}
Search
uses a helper function, httpDo
, to issue the HTTP request and cancel it if ctx.Done
is closed while the request or response is being processed. Search
passes a closure to httpDo
handle the HTTP response:
var results Results
err = httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
return err
}
defer resp.Body.Close()
// Parse the JSON search result.
// https://developers.google.com/web-search/docs/#fonje
var data struct {
ResponseData struct {
Results []struct {
TitleNoFormatting string
URL string
}
}
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return err
}
for _, res := range data.ResponseData.Results {
results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
}
return nil
})
// httpDo waits for the closure we provided to return, so it''s safe to
// read results here.
return results, err
The httpDo
function runs the HTTP request and processes its response in a new goroutine. It cancels the request if ctx.Done
is closed before the goroutine exits:
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
// Run the HTTP request in a goroutine and pass the response to f.
tr := &http.Transport{}
client := &http.Client{Transport: tr}
c := make(chan error, 1)
go func() { c <- f(client.Do(req)) }()
select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}
Adapting code for Contexts
Many server frameworks provide packages and types for carrying request-scoped values. We can define new implementations of the Context
interface to bridge between code using existing frameworks and code that expects a Context
parameter.
For example, Gorilla''s github.com/gorilla/context package allows handlers to associate data with incoming requests by providing a mapping from HTTP requests to key-value pairs. In gorilla.go, we provide a Context
implementation whose Value
method returns the values associated with a specific HTTP request in the Gorilla package.
Other packages have provided cancelation support similar to Context
. For example, Tomb provides a Kill
method that signals cancelation by closing a Dying
channel. Tomb
also provides methods to wait for those goroutines to exit, similar to sync.WaitGroup
. In tomb.go, we provide a Context
implementation that is canceled when either its parent Context
is canceled or a provided Tomb
is killed.
Conclusion
At Google, we require that Go programmers pass a Context
parameter as the first argument to every function on the call path between incoming and outgoing requests. This allows Go code developed by many different teams to interoperate well. It provides simple control over timeouts and cancelation and ensures that critical values like security credentials transit Go programs properly.
Server frameworks that want to build on Context
should provide implementations of Context
to bridge between their packages and those that expect a Context
parameter. Their client libraries would then accept a Context
from the calling code. By establishing a common interface for request-scoped data and cancelation, Context
makes it easier for package developers to share code for creating scalable services.
By Sameer Ajmani
Using contexts to avoid leaking goroutines
https://rakyll.org/leakingctx/
The context package makes it possible to manage a chain of calls within the same call path by signaling context’s Done channel.
In this article, we will examine how to use the context package to avoid leaking goroutines.
Assume, you have a function that starts a goroutine internally. Once this function is called, the caller may not be able to terminate the goroutine started by the function.
// gen is a broken generator that will leak a goroutine.
func gen() <-chan int {
ch := make(chan int)
go func() {
var n int
for {
ch <- n
n++
}
}()
return ch
}
The generator above starts a goroutine with an infinite loop, but the caller consumes the values until n is equal to 5.
// The call site of gen doesn''t have a
for n := range gen() {
fmt.Println(n)
if n == 5 {
break
}
}
Once the caller is done with the generator (when it breaks the loop), the goroutine will run forever executing the infinite loop. Our code will leak a goroutine.
We can avoid the problem by signaling the internal goroutine with a stop channel but there is a better solution: cancellable contexts. The generator can select on a context’s Done channel and once the context is done, the internal goroutine can be cancelled.
// gen is a generator that can be cancellable by cancelling the ctx.
func gen(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
var n int
for {
select {
case <-ctx.Done():
return // avoid leaking of this goroutine when ctx is done.
case ch <- n:
n++
}
}
}()
return ch
}
Now, the caller can signal the generator when it is done consuming. Once cancel function is called, the internal goroutine will be returned.
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // make sure all paths cancel the context to avoid context leak
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
cancel()
break
}
}
// ...
The full program is available as a gist.
Context and Cancellation of goroutines
http://dahernan.github.io/2015/02/04/context-and-cancellation-of-goroutines/
Yesterday I went to the event London Go Gathering, where all the talks had a great level, but particulary Peter Bourgon gave me idea to write about the excelent package context.
Context is used to pass request scoped variables, but in this case I’m only going to focus in cancelation signals.
Lets say that I have a program that execute a long running function, in this case work
and we run it in a separate go routine.
package main
import (
"fmt"
"sync"
"time"
)
var (
wg sync.WaitGroup
)
func work() error {
defer wg.Done()
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}
func main() {
fmt.Println("Hey, I''m going to do some work")
wg.Add(1)
go work()
wg.Wait()
fmt.Println("Finished. I''m going home")
}
$ go run work.go
Hey, I''m going to do some work
Doing some work 0
Doing some work 1
Doing some work 2
Doing some work 3
...
Doing some work 999
Finished. I''m going home
Now imagine that we have to call that work
function from a user interaction or a http request, we probably don’t want to wait forever for that goroutine to finish, so a common pattern is to set a timeout, using a buffered channel, like this:
package main
import (
"fmt"
"log"
"time"
)
func work() error {
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}
func main() {
fmt.Println("Hey, I''m going to do some work")
ch := make(chan error, 1)
go func() {
ch <- work()
}()
select {
case err := <-ch:
if err != nil {
log.Fatal("Something went wrong :(", err)
}
case <-time.After(4 * time.Second):
fmt.Println("Life is to short to wait that long")
}
fmt.Println("Finished. I''m going home")
}
$ go run work.go
Hey, I''m going to do some work
Doing some work 0
Doing some work 1
Life is to short to wait that long
Finished. I''m going home
Now, is a little bit better because, the main execution doesn’t have to wait for work
if it’s timing out.
But it has a problem, if my program is still running like for example a web server, even if I don’t wait for the function work
to finish, the goroutine it would be running and consuming resources. So I need a way to cancel that goroutine.
For cancelation of the goroutine we can use the context package. We have to change the function to accept an argument of type context.Context
, by convention it’s usuallly the first argument.
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/net/context"
)
var (
wg sync.WaitGroup
)
func work(ctx context.Context) error {
defer wg.Done()
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
// we received the signal of cancelation in this channel
case <-ctx.Done():
fmt.Println("Cancel the context ", i)
return ctx.Err()
}
}
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
fmt.Println("Hey, I''m going to do some work")
wg.Add(1)
go work(ctx)
wg.Wait()
fmt.Println("Finished. I''m going home")
}
$ go run work.go
Hey, I''m going to do some work
Doing some work 0
Cancel the context 1
Finished. I''m going home
This is pretty good!, apart that the code looks more simple to manage the timeout, now we are making sure that the function work
doesn’t waste any resource.
These examples are good to learn the basics, but let’s try to make it more real. Now the work
function is going to do an http request to a server and the server is going to be this other program:
package main
// Lazy and Very Random Server
import (
"fmt"
"math/rand"
"net/http"
"time"
)
func main() {
http.HandleFunc("/", LazyServer)
http.ListenAndServe(":1111", nil)
}
// sometimes really fast server, sometimes really slow server
func LazyServer(w http.ResponseWriter, req *http.Request) {
headOrTails := rand.Intn(2)
if headOrTails == 0 {
time.Sleep(6 * time.Second)
fmt.Fprintf(w, "Go! slow %v", headOrTails)
fmt.Printf("Go! slow %v", headOrTails)
return
}
fmt.Fprintf(w, "Go! quick %v", headOrTails)
fmt.Printf("Go! quick %v", headOrTails)
return
}
Randomly is going to be very quick or very slow, we can check that with curl
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
*some seconds later*
Go! slow 0
So we are going to make an http request to this server, in a goroutine, but if the server is slow we are going to Cancel the request and return quickly, so we can manage the cancellation and free the connection.
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"golang.org/x/net/context"
)
var (
wg sync.WaitGroup
)
// main is not changed
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fmt.Println("Hey, I''m going to do some work")
wg.Add(1)
go work(ctx)
wg.Wait()
fmt.Println("Finished. I''m going home")
}
func work(ctx context.Context) error {
defer wg.Done()
tr := &http.Transport{}
client := &http.Client{Transport: tr}
// anonymous struct to pack and unpack data in the channel
c := make(chan struct {
r *http.Response
err error
}, 1)
req, _ := http.NewRequest("GET", "http://localhost:1111", nil)
go func() {
resp, err := client.Do(req)
fmt.Println("Doing http request is a hard job")
pack := struct {
r *http.Response
err error
}{resp, err}
c <- pack
}()
select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for client.Do
fmt.Println("Cancel the context")
return ctx.Err()
case ok := <-c:
err := ok.err
resp := ok.r
if err != nil {
fmt.Println("Error ", err)
return err
}
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Server Response: %s\n", out)
}
return nil
}
$ go run work.go
Hey, I''m going to do some work
Doing http request is a hard job
Server Response: Go! quick 1
Finished. I''m going home
$ go run work.go
Hey, I''m going to do some work
Doing http request is a hard job
Cancel the context
Finished. I''m going home
As you can see in the output, we avoid the slow responses from the server.
In the client the tcp connection is canceled so is not going to be busy waiting for a slow response, so we don’t waste resources.
Happy coding gophers!.
关于concurrency in hibernate的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于Concurrency Learning 1、Configuring jdbc-pool for high-concurrency、Golang 并发模式:超时和继续 Go Concurrency Patterns: Timing out, moving on、golang语言中的context详解,Go Concurrency Patterns: Context等相关内容,可以在本站寻找。
本文标签: